Merge pull request #2824 from ikaven1024/interpret-ctl-execute

add execute mod for interpret command
This commit is contained in:
karmada-bot 2022-11-28 15:21:10 +08:00 committed by GitHub
commit eb3763c201
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 827 additions and 25 deletions

4
go.mod
View File

@ -22,10 +22,12 @@ require (
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64
go.uber.org/atomic v1.7.0
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
golang.org/x/text v0.3.7
golang.org/x/time v0.0.0-20220609170525-579cf78fd858
golang.org/x/tools v0.1.12
gomodules.xyz/jsonpatch/v2 v2.2.0
google.golang.org/grpc v1.47.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.25.4
k8s.io/apiextensions-apiserver v0.25.4
k8s.io/apimachinery v0.25.4
@ -159,7 +161,6 @@ require (
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
google.golang.org/protobuf v1.28.0 // indirect
@ -168,7 +169,6 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/square/go-jose.v2 v2.2.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect

View File

@ -1,16 +1,156 @@
package interpret
import (
"encoding/json"
"fmt"
"io"
"strings"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/kubectl/pkg/cmd/util"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/karmadactl/util/genericresource"
"github.com/karmada-io/karmada/pkg/resourceinterpreter/configurableinterpreter"
)
func (o *Options) completeExecute(_ util.Factory, _ *cobra.Command, _ []string) []error {
return nil
func (o *Options) completeExecute(f util.Factory) []error {
var errs []error
if o.DesiredFile != "" {
o.DesiredResult = f.NewBuilder().
Unstructured().
FilenameParam(false, &resource.FilenameOptions{Filenames: []string{o.DesiredFile}}).
RequireObject(true).
Local().
Do()
errs = append(errs, o.DesiredResult.Err())
}
if o.ObservedFile != "" {
o.ObservedResult = f.NewBuilder().
Unstructured().
FilenameParam(false, &resource.FilenameOptions{Filenames: []string{o.ObservedFile}}).
RequireObject(true).
Local().
Do()
errs = append(errs, o.ObservedResult.Err())
}
if len(o.StatusFile) > 0 {
o.StatusResult = genericresource.NewBuilder().
Constructor(func() interface{} { return &workv1alpha2.AggregatedStatusItem{} }).
Filename(false, o.StatusFile).
Do()
errs = append(errs, o.StatusResult.Err())
}
return errs
}
func (o *Options) runExecute() error {
return fmt.Errorf("not implement")
if o.Operation == "" {
return fmt.Errorf("operation is not set for executing")
}
customizations, err := o.getCustomizationObject()
if err != nil {
return fmt.Errorf("fail to get customization object: %v", err)
}
desired, err := getUnstructuredObjectFromResult(o.DesiredResult)
if err != nil {
return fmt.Errorf("fail to get desired object: %v", err)
}
observed, err := getUnstructuredObjectFromResult(o.ObservedResult)
if err != nil {
return fmt.Errorf("fail to get observed object: %v", err)
}
status, err := o.getAggregatedStatusItems()
if err != nil {
return fmt.Errorf("fail to get status items: %v", err)
}
args := ruleArgs{
Desired: desired,
Observed: observed,
Status: status,
Replica: int64(o.DesiredReplica),
}
interpreter := configurableinterpreter.NewConfigurableInterpreter(nil)
interpreter.LoadConfig(customizations)
r := o.Rules.GetByOperation(o.Operation)
if r == nil {
// Shall never occur, because we validate it before.
return fmt.Errorf("operation %s is not supported. Use one of: %s", o.Operation, strings.Join(o.Rules.Names(), ", "))
}
result := r.Run(interpreter, args)
printExecuteResult(o.Out, o.ErrOut, r.Name(), result)
return nil
}
func printExecuteResult(w, errOut io.Writer, name string, result *ruleResult) {
if result.Err != nil {
fmt.Fprintf(errOut, "Execute %s error: %v\n", name, result.Err)
return
}
for i, res := range result.Results {
func() {
fmt.Fprintln(w, "---")
fmt.Fprintf(w, "# [%v/%v] %s:\n", i+1, len(result.Results), res.Name)
if err := printObjectYaml(w, res.Value); err != nil {
fmt.Fprintf(errOut, "ERROR: %v\n", err)
}
}()
}
}
// MarshalJSON doesn't work for yaml encoder, so unstructured.Unstructured and runtime.RawExtension objects
// will be encoded into unexpected data.
// Example1:
//
// &unstructured.Unstructured{
// Object: map[string]interface{}{
// "foo": "bar"
// },
// }
//
// will be encoded into:
//
// Object:
// foo: bar
//
// Example2:
//
// &runtime.RawExtension{
// Raw: []byte("{}"),
// }
//
// will be encoded into:
//
// raw:
// - 123
// - 125
//
// Inspired from https://github.com/kubernetes/kubernetes/blob/8fb423bfabe0d53934cc94c154c7da2dc3ce1332/staging/src/k8s.io/kubectl/pkg/cmd/get/get.go#L781-L786
// we convert it to map[string]interface{} by json, then encode the converted object to yaml.
func printObjectYaml(w io.Writer, obj interface{}) error {
data, err := json.Marshal(obj)
if err != nil {
return err
}
var converted interface{}
err = json.Unmarshal(data, &converted)
if err != nil {
return err
}
encoder := yaml.NewEncoder(w)
defer encoder.Close()
return encoder.Encode(converted)
}

View File

@ -5,6 +5,7 @@ import (
"strings"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/cli-runtime/pkg/genericclioptions"
@ -13,9 +14,12 @@ import (
"k8s.io/kubectl/pkg/util/templates"
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/karmadactl/options"
"github.com/karmada-io/karmada/pkg/karmadactl/util"
"github.com/karmada-io/karmada/pkg/karmadactl/util/genericresource"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
)
var (
@ -32,18 +36,30 @@ var (
interpretExample = templates.Examples(`
# Check the customizations in file
%[1]s interpret -f customization.json --check
# Execute the retention rule for
# Execute the retention rule
%[1]s interpret -f customization.yml --operation retain --desired-file desired.yml --observed-file observed.yml
# Execute the replicaRevision rule for
# Execute the replicaResource rule
%[1]s interpret -f customization.yml --operation interpretReplica --observed-file observed.yml
# Execute the replicaRevision rule
%[1]s interpret -f customization.yml --operation reviseReplica --observed-file observed.yml --desired-replica 2
# Execute the statusReflection rule for
# Execute the statusReflection rule
%[1]s interpret -f customization.yml --operation interpretStatus --observed-file observed.yml
# Execute the healthInterpretation rule
%[1]s interpret -f customization.yml --operation interpretHealth --observed-file observed.yml
# Execute the dependencyInterpretation rule
%[1]s interpret -f customization.yml --operation interpretDependency --observed-file observed.yml
# Execute the statusAggregation rule
%[1]s interpret -f customization.yml --operation aggregateStatus --status-file status1.yml --status-file status2.yml
%[1]s interpret -f customization.yml --operation aggregateStatus --observed-file observed.yml --status-file status.yml
# Fetch observed object from url, and status items from stdin (specified with -)
%[1]s interpret -f customization.yml --operation aggregateStatus --observed-file https://example.com/observed.yml --status-file -
`)
)
@ -81,7 +97,7 @@ func NewCmdInterpret(f util.Factory, parentCommand string, streams genericcliopt
flags.BoolVar(&o.Check, "check", false, "Validates the given ResourceInterpreterCustomization configuration(s)")
flags.StringVar(&o.DesiredFile, "desired-file", o.DesiredFile, "Filename, directory, or URL to files identifying the resource to use as desiredObj argument in rule script.")
flags.StringVar(&o.ObservedFile, "observed-file", o.ObservedFile, "Filename, directory, or URL to files identifying the resource to use as observedObj argument in rule script.")
flags.StringSliceVar(&o.StatusFile, "status-file", o.StatusFile, "Filename, directory, or URL to files identifying the resource to use as statusItems argument in rule script.")
flags.StringVar(&o.StatusFile, "status-file", o.StatusFile, "Filename, directory, or URL to files identifying the resource to use as statusItems argument in rule script.")
flags.Int32Var(&o.DesiredReplica, "desired-replica", o.DesiredReplica, "The desiredReplica argument in rule script.")
cmdutil.AddJsonFilenameFlag(flags, &o.FilenameOptions.Filenames, "Filename, directory, or URL to files containing the customizations")
flags.BoolVarP(&o.FilenameOptions.Recursive, "recursive", "R", false, "Process the directory used in -f, --filename recursively. Useful when you want to manage related manifests organized within the same directory.")
@ -99,10 +115,13 @@ type Options struct {
// args
DesiredFile string
ObservedFile string
StatusFile []string
StatusFile string
DesiredReplica int32
CustomizationResult *resource.Result
DesiredResult *resource.Result
ObservedResult *resource.Result
StatusResult *genericresource.Result
Rules Rules
@ -122,12 +141,18 @@ func (o *Options) Complete(f util.Factory, cmd *cobra.Command, args []string) er
var errs []error
errs = append(errs, o.CustomizationResult.Err())
errs = append(errs, o.completeExecute(f, cmd, args)...)
errs = append(errs, o.completeExecute(f)...)
return errors.NewAggregate(errs)
}
// Validate checks the EditOptions to see if there is sufficient information to run the command.
// Validate validates Options.
func (o *Options) Validate() error {
if o.Operation != "" {
r := o.Rules.GetByOperation(o.Operation)
if r == nil {
return fmt.Errorf("operation %s is not supported. Use one of: %s", o.Operation, strings.Join(o.Rules.Names(), ", "))
}
}
return nil
}
@ -141,6 +166,56 @@ func (o *Options) Run() error {
}
}
func (o *Options) getCustomizationObject() ([]*configv1alpha1.ResourceInterpreterCustomization, error) {
infos, err := o.CustomizationResult.Infos()
if err != nil {
return nil, err
}
customizations := make([]*configv1alpha1.ResourceInterpreterCustomization, len(infos))
for i, info := range infos {
c, err := asResourceInterpreterCustomization(info.Object)
if err != nil {
return nil, err
}
customizations[i] = c
}
return customizations, nil
}
func (o *Options) getAggregatedStatusItems() ([]workv1alpha2.AggregatedStatusItem, error) {
if o.StatusResult == nil {
return nil, nil
}
objs, err := o.StatusResult.Objects()
if err != nil {
return nil, err
}
items := make([]workv1alpha2.AggregatedStatusItem, len(objs))
for i, obj := range objs {
items[i] = *(obj.(*workv1alpha2.AggregatedStatusItem))
}
return items, nil
}
func getUnstructuredObjectFromResult(result *resource.Result) (*unstructured.Unstructured, error) {
if result == nil {
return nil, nil
}
infos, err := result.Infos()
if err != nil {
return nil, err
}
if len(infos) > 1 {
return nil, fmt.Errorf("get %v objects, expect one at most", len(infos))
}
return helper.ToUnstructured(infos[0].Object)
}
func asResourceInterpreterCustomization(o runtime.Object) (*configv1alpha1.ResourceInterpreterCustomization, error) {
c, ok := o.(*configv1alpha1.ResourceInterpreterCustomization)
if !ok {

View File

@ -2,6 +2,7 @@ package interpret
import (
"fmt"
"strings"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -218,14 +219,19 @@ func (s *statusAggregationRule) Run(interpreter *configurableinterpreter.Configu
if err != nil {
return newRuleResultWithError(err)
}
aggregateStatus, enabled, err := interpreter.AggregateStatus(obj, args.Status)
status := args.Status
if status == nil {
status = []workv1alpha2.AggregatedStatusItem{}
}
aggregateStatus, enabled, err := interpreter.AggregateStatus(obj, status)
if err != nil {
return newRuleResultWithError(err)
}
if !enabled {
return newRuleResultWithError(fmt.Errorf("rule is not enabled"))
}
return newRuleResult().add("aggregateStatus", aggregateStatus)
return newRuleResult().add("aggregatedStatus", aggregateStatus)
}
type healthInterpretationRule struct {
@ -334,6 +340,21 @@ func (r Rules) Names() []string {
return names
}
// GetByOperation returns the matched rule by operation name, ignoring case. Return nil if none is matched.
func (r Rules) GetByOperation(operation string) Rule {
if operation == "" {
return nil
}
operation = strings.ToLower(operation)
for _, rule := range r {
ruleName := strings.ToLower(rule.Name())
if ruleName == operation {
return rule
}
}
return nil
}
// Get returns the rule with the name. If not found, return nil.
func (r Rules) Get(name string) Rule {
for _, rr := range r {
@ -367,10 +388,10 @@ func (r ruleArgs) getObservedObjectOrError() (*unstructured.Unstructured, error)
func (r ruleArgs) getObjectOrError() (*unstructured.Unstructured, error) {
if r.Desired == nil && r.Observed == nil {
return nil, fmt.Errorf("desired, desired-file, observed, observed-file options are not set")
return nil, fmt.Errorf("desired-file, observed-file options are not set")
}
if r.Desired != nil && r.Observed != nil {
return nil, fmt.Errorf("you can not specify multiple object by desired, desired-file, observed, observed-file options")
return nil, fmt.Errorf("you can not specify both desired-file and observed-file options")
}
if r.Desired != nil {
return r.Desired, nil

View File

@ -0,0 +1,167 @@
package genericresource
import (
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/cli-runtime/pkg/resource"
)
const defaultHTTPGetAttempts int = 3
var defaultNewFunc = func() interface{} {
return map[string]interface{}{}
}
var errMissingResource = fmt.Errorf(`you must provide one or more resources`)
// Builder provides convenience functions for taking arguments and parameters
// from the command line and converting them to a list of resources to iterate
// over using the Visitor interface.
type Builder struct {
errs []error
paths []Visitor
stdinInUse bool
mapper *mapper
schema resource.ContentValidator
}
// NewBuilder returns a Builder.
func NewBuilder() *Builder {
return &Builder{
mapper: &mapper{
newFunc: defaultNewFunc,
},
}
}
// Schema set the schema to validate data in files.
func (b *Builder) Schema(schema resource.ContentValidator) *Builder {
b.schema = schema
return b
}
// Constructor tells wanted type of object.
func (b *Builder) Constructor(newFunc func() interface{}) *Builder {
b.mapper.newFunc = newFunc
return b
}
// Filename groups input in two categories: URLs and files (files, directories, STDIN)
func (b *Builder) Filename(recursive bool, filenames ...string) *Builder {
for _, s := range filenames {
switch {
case s == "-":
b.Stdin()
case strings.HasPrefix(s, "http://") || strings.HasPrefix(s, "https://"):
u, err := url.Parse(s)
if err != nil {
b.errs = append(b.errs, fmt.Errorf("the URL passed to filename %q is not valid: %v", s, err))
continue
}
b.URL(defaultHTTPGetAttempts, u)
default:
matches, err := expandIfFilePattern(s)
if err != nil {
b.errs = append(b.errs, err)
continue
}
b.Path(recursive, matches...)
}
}
return b
}
// Stdin will read objects from the standard input.
func (b *Builder) Stdin() *Builder {
if b.stdinInUse {
b.errs = append(b.errs, resource.StdinMultiUseError)
}
b.stdinInUse = true
b.paths = append(b.paths, FileVisitorForSTDIN(b.mapper, b.schema))
return b
}
// URL accepts a number of URLs directly.
func (b *Builder) URL(httpAttemptCount int, urls ...*url.URL) *Builder {
for _, u := range urls {
b.paths = append(b.paths, NewURLVisitor(b.mapper, httpAttemptCount, u, b.schema))
}
return b
}
// Path accepts a set of paths that may be files, directories (all can contain
// one or more resources). Creates a FileVisitor for each file and then each
// FileVisitor is streaming the content to a StreamVisitor.
func (b *Builder) Path(recursive bool, paths ...string) *Builder {
for _, p := range paths {
_, err := os.Stat(p)
if os.IsNotExist(err) {
b.errs = append(b.errs, fmt.Errorf("the path %q does not exist", p))
continue
}
if err != nil {
b.errs = append(b.errs, fmt.Errorf("the path %q cannot be accessed: %v", p, err))
continue
}
visitors, err := ExpandPathsToFileVisitors(b.mapper, p, recursive, resource.FileExtensions, b.schema)
if err != nil {
b.errs = append(b.errs, fmt.Errorf("error reading %q: %v", p, err))
}
b.paths = append(b.paths, visitors...)
}
if len(b.paths) == 0 && len(b.errs) == 0 {
b.errs = append(b.errs, fmt.Errorf("error reading %v: recognized file extensions are %v", paths, resource.FileExtensions))
}
return b
}
// Do returns a Result object with a Visitor for the resources identified by the Builder. Note that stream
// inputs are consumed by the first execution - use Infos() or Objects() on the Result to capture a list
// for further iteration.
func (b *Builder) Do() *Result {
r := b.visitorResult()
return r
}
func (b *Builder) visitorResult() *Result {
if len(b.errs) > 0 {
return &Result{err: utilerrors.NewAggregate(b.errs)}
}
// visit items specified by paths
if len(b.paths) != 0 {
return b.visitByPaths()
}
return &Result{err: errMissingResource}
}
func (b *Builder) visitByPaths() *Result {
result := &Result{}
result.visitor = VisitorList(b.paths)
return result
}
// expandIfFilePattern returns all the filenames that match the input pattern
// or the filename if it is a specific filename and not a pattern.
// If the input is a pattern and it yields no result it will result in an error.
func expandIfFilePattern(pattern string) ([]string, error) {
if _, err := os.Stat(pattern); os.IsNotExist(err) {
matches, err := filepath.Glob(pattern)
if err == nil && len(matches) == 0 {
return nil, fmt.Errorf("the path %q does not exist", pattern)
}
if err == filepath.ErrBadPattern {
return nil, fmt.Errorf("pattern %q is not valid: %v", pattern, err)
}
return matches, err
}
return []string{pattern}, nil
}

View File

@ -0,0 +1,3 @@
// Package genericresource is modified from "k8s.io/cli-runtime/pkg/resource".
// It can fetch any object (not only runtime.object) from file, http, and stdin.
package genericresource

View File

@ -0,0 +1,13 @@
package genericresource
// Visitor lets clients walk a list of resources.
type Visitor interface {
Visit(VisitorFunc) error
}
// VisitorFunc implements the Visitor interface for a matching function.
// If there was a problem walking a list of resources, the incoming error
// will describe the problem and the function can decide how to handle that error.
// A nil returned indicates to accept an error to continue loops even when errors happen.
// This is useful for ignoring certain kinds of errors or aggregating errors in some way.
type VisitorFunc func(*Info, error) error

View File

@ -0,0 +1,74 @@
package genericresource
import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
)
// Result contains helper methods for dealing with the outcome of a Builder.
type Result struct {
err error
visitor Visitor
// populated by a call to Infos
info []*Info
ignoreErrors []utilerrors.Matcher
}
// Err returns one or more errors (via a util.ErrorList) that occurred prior
// to visiting the elements in the visitor. To see all errors including those
// that occur during visitation, invoke Infos().
func (r *Result) Err() error {
return r.err
}
// Objects returns the list of objects of all found resources.
func (r *Result) Objects() ([]interface{}, error) {
infos, err := r.Infos()
if err != nil {
return nil, err
}
objects := make([]interface{}, len(infos))
for i, info := range infos {
objects[i] = info.Object
}
return objects, err
}
// Infos returns an array of all of the resource infos retrieved via traversal.
// Will attempt to traverse the entire set of visitors only once, and will return
// a cached list on subsequent calls.
func (r *Result) Infos() ([]*Info, error) {
if r.err != nil {
return nil, r.err
}
if r.info != nil {
return r.info, nil
}
var infos []*Info
err := r.visitor.Visit(func(info *Info, err error) error {
if err != nil {
return err
}
infos = append(infos, info)
return nil
})
err = utilerrors.FilterOut(err, r.ignoreErrors...)
r.info, r.err = infos, err
return infos, err
}
// Visit implements the Visitor interface on the items described in the Builder.
// Note that some visitor sources are not traversable more than once, or may
// return different results. If you wish to operate on the same set of resources
// multiple times, use the Infos() method.
func (r *Result) Visit(fn VisitorFunc) error {
if r.err != nil {
return r.err
}
err := r.visitor.Visit(fn)
return utilerrors.FilterOut(err, r.ignoreErrors...)
}

View File

@ -0,0 +1,294 @@
package genericresource
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"time"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/cli-runtime/pkg/resource"
)
const (
constSTDINstr = "STDIN"
)
// VisitorList implements Visit for the sub visitors it contains. The first error
// returned from a child Visitor will terminate iteration.
type VisitorList []Visitor
// Visit implements Visitor
func (l VisitorList) Visit(fn VisitorFunc) error {
for i := range l {
if err := l[i].Visit(fn); err != nil {
return err
}
}
return nil
}
// URLVisitor downloads the contents of a URL, and if successful, returns
// an info object representing the downloaded object.
type URLVisitor struct {
URL *url.URL
*StreamVisitor
HTTPAttemptCount int
}
// NewURLVisitor returns a visitor to download from given url. It will max retry "httpAttemptCount" when failed.
func NewURLVisitor(mapper *mapper, httpAttemptCount int, u *url.URL, schema resource.ContentValidator) *URLVisitor {
return &URLVisitor{
URL: u,
StreamVisitor: NewStreamVisitor(nil, mapper, u.String(), schema),
HTTPAttemptCount: httpAttemptCount,
}
}
// Visit down object from url.
func (v *URLVisitor) Visit(fn VisitorFunc) error {
body, err := readHTTPWithRetries(httpgetImpl, time.Second, v.URL.String(), v.HTTPAttemptCount)
if err != nil {
return err
}
defer body.Close()
v.StreamVisitor.Reader = body
return v.StreamVisitor.Visit(fn)
}
func ignoreFile(path string, extensions []string) bool {
if len(extensions) == 0 {
return false
}
ext := filepath.Ext(path)
for _, s := range extensions {
if s == ext {
return false
}
}
return true
}
// FileVisitorForSTDIN return a special FileVisitor just for STDIN
func FileVisitorForSTDIN(mapper *mapper, schema resource.ContentValidator) Visitor {
return &FileVisitor{
Path: constSTDINstr,
StreamVisitor: NewStreamVisitor(nil, mapper, constSTDINstr, schema),
}
}
// ExpandPathsToFileVisitors will return a slice of FileVisitors that will handle files from the provided path.
// After FileVisitors open the files, they will pass an io.Reader to a StreamVisitor to do the reading. (stdin
// is also taken care of). Paths argument also accepts a single file, and will return a single visitor
func ExpandPathsToFileVisitors(mapper *mapper, paths string, recursive bool, extensions []string, schema resource.ContentValidator) ([]Visitor, error) {
var visitors []Visitor
err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.IsDir() {
if path != paths && !recursive {
return filepath.SkipDir
}
return nil
}
// Don't check extension if the filepath was passed explicitly
if path != paths && ignoreFile(path, extensions) {
return nil
}
visitor := &FileVisitor{
Path: path,
StreamVisitor: NewStreamVisitor(nil, mapper, path, schema),
}
visitors = append(visitors, visitor)
return nil
})
if err != nil {
return nil, err
}
return visitors, nil
}
// FileVisitor is wrapping around a StreamVisitor, to handle open/close files
type FileVisitor struct {
Path string
*StreamVisitor
}
// Visit in a FileVisitor is just taking care of opening/closing files
func (v *FileVisitor) Visit(fn VisitorFunc) error {
var f *os.File
if v.Path == constSTDINstr {
f = os.Stdin
} else {
var err error
f, err = os.Open(v.Path)
if err != nil {
return err
}
defer f.Close()
}
// TODO: Consider adding a flag to force to UTF16, apparently some
// Windows tools don't write the BOM
utf16bom := unicode.BOMOverride(unicode.UTF8.NewDecoder())
v.StreamVisitor.Reader = transform.NewReader(f, utf16bom)
return v.StreamVisitor.Visit(fn)
}
// StreamVisitor reads objects from an io.Reader and walks them. A stream visitor can only be
// visited once.
// Unmarshal stream to object by json.
type StreamVisitor struct {
io.Reader
*mapper
Source string
Schema resource.ContentValidator
}
// NewStreamVisitor is a helper function that is useful when we want to change the fields of the struct but keep calls the same.
func NewStreamVisitor(r io.Reader, mapper *mapper, source string, schema resource.ContentValidator) *StreamVisitor {
return &StreamVisitor{
Reader: r,
mapper: mapper,
Source: source,
Schema: schema,
}
}
// Visit implements Visitor over a stream. StreamVisitor is able to distinct multiple resources in one stream.
func (v *StreamVisitor) Visit(fn VisitorFunc) error {
d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
for {
ext := runtime.RawExtension{}
if err := d.Decode(&ext); err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("error parsing %s: %v", v.Source, err)
}
// TODO: This needs to be able to handle object in other encodings and schemas.
ext.Raw = bytes.TrimSpace(ext.Raw)
if len(ext.Raw) == 0 || bytes.Equal(ext.Raw, []byte("null")) {
continue
}
if err := resource.ValidateSchema(ext.Raw, v.Schema); err != nil {
return fmt.Errorf("error validating %q: %v", v.Source, err)
}
info, err := v.infoForData(ext.Raw, v.Source)
if err != nil {
if fnErr := fn(info, err); fnErr != nil {
return fnErr
}
continue
}
if err = fn(info, nil); err != nil {
return err
}
}
}
type mapper struct {
newFunc func() interface{}
}
func (m *mapper) infoForData(data []byte, source string) (*Info, error) {
info := &Info{
Source: source,
Data: data,
Object: m.newFunc(),
}
err := json.Unmarshal(data, info.Object)
if err != nil {
return nil, err
}
return info, nil
}
// httpget Defines function to retrieve a url and return the results. Exists for unit test stubbing.
type httpget func(url string) (int, string, io.ReadCloser, error)
// httpgetImpl Implements a function to retrieve a url and return the results.
func httpgetImpl(url string) (int, string, io.ReadCloser, error) {
// nolint:gosec
resp, err := http.Get(url)
if err != nil {
return 0, "", nil, err
}
return resp.StatusCode, resp.Status, resp.Body, nil
}
// readHTTPWithRetries tries to http.Get the v.URL retries times before giving up.
func readHTTPWithRetries(get httpget, duration time.Duration, u string, attempts int) (io.ReadCloser, error) {
var err error
if attempts <= 0 {
return nil, fmt.Errorf("http attempts must be greater than 0, was %d", attempts)
}
for i := 0; i < attempts; i++ {
var (
statusCode int
status string
body io.ReadCloser
)
if i > 0 {
time.Sleep(duration)
}
// Try to get the URL
statusCode, status, body, err = get(u)
// Retry Errors
if err != nil {
continue
}
if statusCode == http.StatusOK {
return body, nil
}
body.Close()
// Error - Set the error condition from the StatusCode
err = fmt.Errorf("unable to read URL %q, server reported %s, status code=%d", u, status, statusCode)
if statusCode >= 500 && statusCode < 600 {
// Retry 500's
continue
} else {
// Don't retry other StatusCodes
break
}
}
return nil, err
}
// Info contains temporary info to execute a REST call, or show the results
// of an already completed REST call.
type Info struct {
// Optional, Source is the filename or URL to template file (.json or .yaml),
// or stdin to use to handle the resource
Source string
// Optional, this is the most recent value returned by the server if available. It will
// typically be in unstructured or internal forms, depending on how the Builder was
// defined. If retrieved from the server, the Builder expects the mapping client to
// decide the final form. Use the AsVersioned, AsUnstructured, and AsInternal helpers
// to alter the object versions.
// If Subresource is specified, this will be the object for the subresource.
Data []byte
Object interface{}
}

View File

@ -27,6 +27,7 @@ var resourceInterpreterCustomizationsGVR = schema.GroupVersionResource{
type ConfigManager interface {
LuaScriptAccessors() map[schema.GroupVersionKind]CustomAccessor
HasSynced() bool
LoadConfig(customizations []*configv1alpha1.ResourceInterpreterCustomization)
}
// interpreterConfigManager collects the resource interpreter customization.
@ -61,19 +62,24 @@ func (configManager *interpreterConfigManager) HasSynced() bool {
// NewInterpreterConfigManager watches ResourceInterpreterCustomization and organizes
// the configurations in the cache.
func NewInterpreterConfigManager(inform genericmanager.SingleClusterInformerManager) ConfigManager {
func NewInterpreterConfigManager(informer genericmanager.SingleClusterInformerManager) ConfigManager {
manager := &interpreterConfigManager{
lister: inform.Lister(resourceInterpreterCustomizationsGVR),
initialSynced: &atomic.Value{},
configuration: &atomic.Value{},
}
manager.configuration.Store(make(map[schema.GroupVersionKind]CustomAccessor))
manager.initialSynced.Store(false)
configHandlers := fedinformer.NewHandlerOnEvents(
func(_ interface{}) { manager.updateConfiguration() },
func(_, _ interface{}) { manager.updateConfiguration() },
func(_ interface{}) { manager.updateConfiguration() })
inform.ForResource(resourceInterpreterCustomizationsGVR, configHandlers)
// In interpret command, rules are not loaded from server, so we don't start informer for it.
if informer != nil {
manager.lister = informer.Lister(resourceInterpreterCustomizationsGVR)
configHandlers := fedinformer.NewHandlerOnEvents(
func(_ interface{}) { manager.updateConfiguration() },
func(_, _ interface{}) { manager.updateConfiguration() },
func(_ interface{}) { manager.updateConfiguration() })
informer.ForResource(resourceInterpreterCustomizationsGVR, configHandlers)
}
return manager
}
@ -94,6 +100,10 @@ func (configManager *interpreterConfigManager) updateConfiguration() {
configs[index] = config
}
configManager.LoadConfig(configs)
}
func (configManager *interpreterConfigManager) LoadConfig(configs []*configv1alpha1.ResourceInterpreterCustomization) {
sort.Slice(configs, func(i, j int) bool {
return configs[i].Name < configs[j].Name
})

View File

@ -145,3 +145,8 @@ func (c *ConfigurableInterpreter) getInterpreter(kind schema.GroupVersionKind, o
}
return script, len(script) > 0
}
// LoadConfig loads and stores rules from customizations
func (c *ConfigurableInterpreter) LoadConfig(customizations []*configv1alpha1.ResourceInterpreterCustomization) {
c.configManager.LoadConfig(customizations)
}