mirror of https://github.com/fluxcd/cli-utils.git
Merge branch 'master' into E2ETests
This commit is contained in:
commit
382874eda8
6
Makefile
6
Makefile
|
|
@ -10,6 +10,12 @@ export PATH := $(MYGOBIN):$(PATH)
|
|||
|
||||
all: generate license fix vet fmt test lint tidy
|
||||
|
||||
# The following target intended for reference by a file in
|
||||
# https://github.com/kubernetes/test-infra/tree/master/config/jobs/kubernetes-sigs/cli-utils
|
||||
.PHONY: prow-presubmit-check
|
||||
prow-presubmit-check: \
|
||||
test
|
||||
|
||||
fix:
|
||||
go fix ./...
|
||||
|
||||
|
|
|
|||
|
|
@ -22,17 +22,12 @@ func NewCmdApply(f util.Factory, ioStreams genericclioptions.IOStreams) *cobra.C
|
|||
}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "apply (-f FILENAME | -k DIRECTORY)",
|
||||
Use: "apply (FILENAME... | DIRECTORY)",
|
||||
DisableFlagsInUseLine: true,
|
||||
Short: i18n.T("Apply a configuration to a resource by filename or stdin"),
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
if len(args) > 0 {
|
||||
// check is kustomize, if so update
|
||||
applier.ApplyOptions.DeleteFlags.FileNameFlags.Kustomize = &args[0]
|
||||
}
|
||||
|
||||
cmdutil.CheckErr(applier.Initialize(cmd))
|
||||
paths := args
|
||||
cmdutil.CheckErr(applier.Initialize(cmd, paths))
|
||||
|
||||
// Create a context with the provided timout from the cobra parameter.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), applier.StatusOptions.Timeout)
|
||||
|
|
@ -47,15 +42,15 @@ func NewCmdApply(f util.Factory, ioStreams genericclioptions.IOStreams) *cobra.C
|
|||
},
|
||||
}
|
||||
|
||||
applier.SetFlags(cmd)
|
||||
|
||||
cmdutil.CheckErr(applier.SetFlags(cmd))
|
||||
cmdutil.AddValidateFlags(cmd)
|
||||
_ = cmd.Flags().MarkHidden("validate")
|
||||
|
||||
cmd.Flags().BoolVar(&applier.NoPrune, "no-prune", applier.NoPrune, "If true, do not prune previously applied objects.")
|
||||
cmd.Flags().BoolVar(&applier.DryRun, "dry-run", applier.DryRun,
|
||||
"If true, only print the object that would be sent, without sending it. Warning: --dry-run cannot accurately output the result "+
|
||||
"of merging the local manifest and the server-side data. Use --server-dry-run to get the merged result instead.")
|
||||
cmd.Flags().BoolVar(&applier.ApplyOptions.ServerDryRun, "server-dry-run", applier.ApplyOptions.ServerDryRun,
|
||||
"If true, request will be sent to server with dry-run flag, which means the modifications won't be persisted. This is an alpha feature and flag.")
|
||||
// Necessary because ApplyOptions depends on it--hidden.
|
||||
cmd.Flags().BoolVar(&applier.DryRun, "dry-run", applier.DryRun, "NOT USED")
|
||||
_ = cmd.Flags().MarkHidden("dry-run")
|
||||
|
||||
cmdutil.AddServerSideApplyFlags(cmd)
|
||||
|
||||
return cmd
|
||||
|
|
|
|||
|
|
@ -44,8 +44,10 @@ func NewCmdDestroy(f util.Factory, ioStreams genericclioptions.IOStreams) *cobra
|
|||
destroyer.SetFlags(cmd)
|
||||
|
||||
cmdutil.AddValidateFlags(cmd)
|
||||
cmd.Flags().BoolVar(&destroyer.DryRun, "dry-run", destroyer.DryRun,
|
||||
"If true, only print the object that would be sent and action which would be performed, without performing it.")
|
||||
_ = cmd.Flags().MarkHidden("validate")
|
||||
|
||||
cmd.Flags().BoolVar(&destroyer.DryRun, "dry-run", destroyer.DryRun, "If true, only print the objects that would be deleted, without performing it.")
|
||||
|
||||
cmdutil.AddServerSideApplyFlags(cmd)
|
||||
return cmd
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,15 +27,10 @@ func NewCmdPreview(f util.Factory, ioStreams genericclioptions.IOStreams) *cobra
|
|||
Short: i18n.T("Preview the apply of a configuration"),
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
if len(args) > 0 {
|
||||
// check is kustomize, if so update
|
||||
applier.ApplyOptions.DeleteFlags.FileNameFlags.Kustomize = &args[0]
|
||||
}
|
||||
|
||||
// Set DryRun option true before Initialize. DryRun is propagated to
|
||||
// ApplyOptions and PruneOptions in Initialize.
|
||||
applier.DryRun = true
|
||||
cmdutil.CheckErr(applier.Initialize(cmd))
|
||||
cmdutil.CheckErr(applier.Initialize(cmd, args))
|
||||
|
||||
// Create a context with the provided timout from the cobra parameter.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), applier.StatusOptions.Timeout)
|
||||
|
|
@ -50,11 +45,12 @@ func NewCmdPreview(f util.Factory, ioStreams genericclioptions.IOStreams) *cobra
|
|||
},
|
||||
}
|
||||
|
||||
applier.SetFlags(cmd)
|
||||
|
||||
cmdutil.CheckErr(applier.SetFlags(cmd))
|
||||
cmdutil.AddValidateFlags(cmd)
|
||||
_ = cmd.Flags().MarkHidden("validate")
|
||||
|
||||
cmd.Flags().BoolVar(&applier.NoPrune, "no-prune", applier.NoPrune, "If true, do not prune previously applied objects.")
|
||||
// Necessary because ApplyOptions depends on it--not used.
|
||||
// Necessary because ApplyOptions depends on it--hidden.
|
||||
cmd.Flags().BoolVar(&applier.DryRun, "dry-run", applier.DryRun, "NOT USED")
|
||||
_ = cmd.Flags().MarkHidden("dry-run")
|
||||
|
||||
|
|
|
|||
|
|
@ -61,12 +61,14 @@ type Applier struct {
|
|||
// Initialize sets up the Applier for actually doing an apply against
|
||||
// a cluster. This involves validating command line inputs and configuring
|
||||
// clients for communicating with the cluster.
|
||||
func (a *Applier) Initialize(cmd *cobra.Command) error {
|
||||
a.ApplyOptions.PreProcessorFn = prune.PrependGroupingObject(a.ApplyOptions)
|
||||
func (a *Applier) Initialize(cmd *cobra.Command, paths []string) error {
|
||||
fileNameFlags := processPaths(paths)
|
||||
a.ApplyOptions.DeleteFlags.FileNameFlags = &fileNameFlags
|
||||
err := a.ApplyOptions.Complete(a.factory, cmd)
|
||||
if err != nil {
|
||||
return errors.WrapPrefix(err, "error setting up ApplyOptions", 1)
|
||||
}
|
||||
a.ApplyOptions.PreProcessorFn = prune.PrependGroupingObject(a.ApplyOptions)
|
||||
err = a.PruneOptions.Initialize(a.factory, a.ApplyOptions.Namespace)
|
||||
if err != nil {
|
||||
return errors.WrapPrefix(err, "error setting up PruneOptions", 1)
|
||||
|
|
@ -87,12 +89,23 @@ func (a *Applier) Initialize(cmd *cobra.Command) error {
|
|||
// SetFlags configures the command line flags needed for apply and
|
||||
// status. This is a temporary solution as we should separate the configuration
|
||||
// of cobra flags from the Applier.
|
||||
func (a *Applier) SetFlags(cmd *cobra.Command) {
|
||||
func (a *Applier) SetFlags(cmd *cobra.Command) error {
|
||||
a.ApplyOptions.DeleteFlags.AddFlags(cmd)
|
||||
for _, flag := range []string{"kustomize", "filename", "recursive"} {
|
||||
err := cmd.Flags().MarkHidden(flag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
a.ApplyOptions.RecordFlags.AddFlags(cmd)
|
||||
a.ApplyOptions.PrintFlags.AddFlags(cmd)
|
||||
_ = cmd.Flags().MarkHidden("cascade")
|
||||
_ = cmd.Flags().MarkHidden("force")
|
||||
_ = cmd.Flags().MarkHidden("grace-period")
|
||||
_ = cmd.Flags().MarkHidden("timeout")
|
||||
_ = cmd.Flags().MarkHidden("wait")
|
||||
a.StatusOptions.AddFlags(cmd)
|
||||
a.ApplyOptions.Overwrite = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// newResolver sets up a new Resolver for computing status. The configuration
|
||||
|
|
|
|||
|
|
@ -70,6 +70,11 @@ func (d *Destroyer) Run() <-chan event.Event {
|
|||
go func() {
|
||||
defer close(ch)
|
||||
infos, _ := d.ApplyOptions.GetObjects()
|
||||
// Clear the data/inventory section of the grouping object configmap,
|
||||
// so the prune will calculate the prune set as all the objects,
|
||||
// deleting everything. We can ignore the error, since the Prune
|
||||
// will catch the same problems.
|
||||
_ = prune.ClearGroupingObj(infos)
|
||||
err := d.PruneOptions.Prune(infos, ch)
|
||||
if err != nil {
|
||||
// If we see an error here we just report it on the channel and then
|
||||
|
|
@ -92,6 +97,10 @@ func (d *Destroyer) Run() <-chan event.Event {
|
|||
// of cobra flags from the Destroyer.
|
||||
func (d *Destroyer) SetFlags(cmd *cobra.Command) {
|
||||
d.ApplyOptions.DeleteFlags.AddFlags(cmd)
|
||||
d.ApplyOptions.RecordFlags.AddFlags(cmd)
|
||||
_ = cmd.Flags().MarkHidden("cascade")
|
||||
_ = cmd.Flags().MarkHidden("force")
|
||||
_ = cmd.Flags().MarkHidden("grace-period")
|
||||
_ = cmd.Flags().MarkHidden("timeout")
|
||||
_ = cmd.Flags().MarkHidden("wait")
|
||||
d.ApplyOptions.Overwrite = true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,20 @@
|
|||
package apply
|
||||
|
||||
import (
|
||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||
)
|
||||
|
||||
func processPaths(paths []string) genericclioptions.FileNameFlags {
|
||||
// No arguments means we are reading from StdIn
|
||||
fileNameFlags := genericclioptions.FileNameFlags{}
|
||||
if len(paths) == 0 {
|
||||
fileNames := []string{"-"}
|
||||
fileNameFlags.Filenames = &fileNames
|
||||
return fileNameFlags
|
||||
}
|
||||
|
||||
t := true
|
||||
fileNameFlags.Filenames = &paths
|
||||
fileNameFlags.Recursive = &t
|
||||
return fileNameFlags
|
||||
}
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
package apply
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"gotest.tools/assert"
|
||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||
)
|
||||
|
||||
func TestProcessPaths(t *testing.T) {
|
||||
trueVal := true
|
||||
testCases := map[string]struct {
|
||||
paths []string
|
||||
expectedFileNameFlags genericclioptions.FileNameFlags
|
||||
}{
|
||||
"empty slice means reading from StdIn": {
|
||||
paths: []string{},
|
||||
expectedFileNameFlags: genericclioptions.FileNameFlags{
|
||||
Filenames: &[]string{"-"},
|
||||
},
|
||||
},
|
||||
"single element in slice means reading from that file/path": {
|
||||
paths: []string{"object.yaml"},
|
||||
expectedFileNameFlags: genericclioptions.FileNameFlags{
|
||||
Filenames: &[]string{"object.yaml"},
|
||||
Recursive: &trueVal,
|
||||
},
|
||||
},
|
||||
"multiple elements in slice means reading from all files": {
|
||||
paths: []string{"rs.yaml", "dep.yaml"},
|
||||
expectedFileNameFlags: genericclioptions.FileNameFlags{
|
||||
Filenames: &[]string{"rs.yaml", "dep.yaml"},
|
||||
Recursive: &trueVal,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for tn, tc := range testCases {
|
||||
t.Run(tn, func(t *testing.T) {
|
||||
fileNameFlags := processPaths(tc.paths)
|
||||
|
||||
assert.DeepEqual(t, tc.expectedFileNameFlags, fileNameFlags)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -224,6 +224,39 @@ func RetrieveInventoryFromGroupingObj(infos []*resource.Info) ([]*ObjMetadata, e
|
|||
return inventory, nil
|
||||
}
|
||||
|
||||
// ClearGroupingObj finds the grouping object in the list of objects,
|
||||
// and sets an empty inventory. Returns error if the grouping object
|
||||
// is not Unstructured, the grouping object does not exist, or if
|
||||
// we can't set the empty inventory on the grouping object. If successful,
|
||||
// returns nil.
|
||||
func ClearGroupingObj(infos []*resource.Info) error {
|
||||
// Initially, find the grouping object ConfigMap (in Unstructured format).
|
||||
var groupingObj *unstructured.Unstructured
|
||||
for _, info := range infos {
|
||||
obj := info.Object
|
||||
if IsGroupingObject(obj) {
|
||||
var ok bool
|
||||
groupingObj, ok = obj.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return fmt.Errorf("grouping object is not an Unstructured: %#v", groupingObj)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if groupingObj == nil {
|
||||
return fmt.Errorf("grouping object not found")
|
||||
}
|
||||
// Clears the inventory map of the ConfigMap "data" section.
|
||||
emptyMap := map[string]string{}
|
||||
err := unstructured.SetNestedStringMap(groupingObj.UnstructuredContent(),
|
||||
emptyMap, "data")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// calcInventoryHash returns an unsigned int32 representing the hash
|
||||
// of the inventory strings. If there is an error writing bytes to
|
||||
// the hash, then the error is returned; nil is returned otherwise.
|
||||
|
|
|
|||
|
|
@ -581,6 +581,65 @@ func TestAddSuffixToName(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestClearGroupingObject(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
infos []*resource.Info
|
||||
isError bool
|
||||
}{
|
||||
"Empty infos should error": {
|
||||
infos: []*resource.Info{},
|
||||
isError: true,
|
||||
},
|
||||
"Non-Unstructured grouping object should error": {
|
||||
infos: []*resource.Info{nonUnstructuredGroupingInfo},
|
||||
isError: true,
|
||||
},
|
||||
"Info with nil Object should error": {
|
||||
infos: []*resource.Info{nilInfo},
|
||||
isError: true,
|
||||
},
|
||||
"Single grouping object should work": {
|
||||
infos: []*resource.Info{copyGroupingInfo()},
|
||||
isError: false,
|
||||
},
|
||||
"Single non-grouping object should error": {
|
||||
infos: []*resource.Info{pod1Info},
|
||||
isError: true,
|
||||
},
|
||||
"Multiple non-grouping objects should error": {
|
||||
infos: []*resource.Info{pod1Info, pod2Info},
|
||||
isError: true,
|
||||
},
|
||||
"Grouping object with single inventory object should work": {
|
||||
infos: []*resource.Info{copyGroupingInfo(), pod1Info},
|
||||
isError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
err := ClearGroupingObj(tc.infos)
|
||||
if tc.isError {
|
||||
if err == nil {
|
||||
t.Errorf("Should have produced an error, but returned none.")
|
||||
}
|
||||
}
|
||||
if !tc.isError {
|
||||
if err != nil {
|
||||
t.Fatalf("Received unexpected error: %#v", err)
|
||||
}
|
||||
objMetadata, err := RetrieveInventoryFromGroupingObj(tc.infos)
|
||||
if err != nil {
|
||||
t.Fatalf("Received unexpected error: %#v", err)
|
||||
}
|
||||
if len(objMetadata) > 0 {
|
||||
t.Errorf("Grouping object inventory not cleared: %#v\n", objMetadata)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrependGroupingObject(t *testing.T) {
|
||||
tests := []struct {
|
||||
infos []*resource.Info
|
||||
|
|
|
|||
|
|
@ -0,0 +1,225 @@
|
|||
package reader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
// This map is hard-coded knowledge that a Deployment contains and
|
||||
// ReplicaSet, and that a ReplicaSet in turn contains Pods, etc., and the
|
||||
// approach to finding status being used here requires hardcoding that
|
||||
// knowledge in the status client library.
|
||||
// TODO: These should probably be defined in the observers rather than here.
|
||||
var genGroupKinds = map[schema.GroupKind][]schema.GroupKind{
|
||||
schema.GroupKind{Group: "apps", Kind: "Deployment"}: { //nolint:gofmt
|
||||
{
|
||||
Group: "apps",
|
||||
Kind: "ReplicaSet",
|
||||
},
|
||||
},
|
||||
schema.GroupKind{Group: "apps", Kind: "ReplicaSet"}: { //nolint:gofmt
|
||||
{
|
||||
Group: "",
|
||||
Kind: "Pod",
|
||||
},
|
||||
},
|
||||
schema.GroupKind{Group: "apps", Kind: "StatefulSet"}: { //nolint:gofmt
|
||||
{
|
||||
Group: "",
|
||||
Kind: "Pod",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// NewCachingClusterReader returns a new instance of the ObserverReader. The
|
||||
// ClusterReader needs will use the reader to fetch resources from the cluster,
|
||||
// while the mapper is used to resolve the version for GroupKinds. The list of
|
||||
// identifiers is needed so the ClusterReader can figure out which GroupKind
|
||||
// and namespace combinations it needs to cache when the Sync function is called.
|
||||
// We only want to fetch the resources that are actually needed.
|
||||
func NewCachingClusterReader(reader client.Reader, mapper meta.RESTMapper, identifiers []wait.ResourceIdentifier) (*CachingClusterReader, error) {
|
||||
gvkNamespaceSet := newGnSet()
|
||||
for _, id := range identifiers {
|
||||
// For every identifier, add the GroupVersionKind and namespace combination to the gvkNamespaceSet and
|
||||
// check the genGroupKinds map for any generated resources that also should be included.
|
||||
err := buildGvkNamespaceSet(mapper, []schema.GroupKind{id.GroupKind}, id.Namespace, gvkNamespaceSet)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &CachingClusterReader{
|
||||
reader: reader,
|
||||
mapper: mapper,
|
||||
gns: gvkNamespaceSet.gvkNamespaces,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func buildGvkNamespaceSet(mapper meta.RESTMapper, gks []schema.GroupKind, namespace string, gvkNamespaceSet *gvkNamespaceSet) error {
|
||||
for _, gk := range gks {
|
||||
mapping, err := mapper.RESTMapping(gk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
gvkNamespaceSet.add(gvkNamespace{
|
||||
GVK: mapping.GroupVersionKind,
|
||||
Namespace: namespace,
|
||||
})
|
||||
genGKs, found := genGroupKinds[gk]
|
||||
if found {
|
||||
err := buildGvkNamespaceSet(mapper, genGKs, namespace, gvkNamespaceSet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type gvkNamespaceSet struct {
|
||||
gvkNamespaces []gvkNamespace
|
||||
seen map[gvkNamespace]bool
|
||||
}
|
||||
|
||||
func newGnSet() *gvkNamespaceSet {
|
||||
return &gvkNamespaceSet{
|
||||
gvkNamespaces: make([]gvkNamespace, 0),
|
||||
seen: make(map[gvkNamespace]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (g *gvkNamespaceSet) add(gn gvkNamespace) {
|
||||
if _, found := g.seen[gn]; !found {
|
||||
g.gvkNamespaces = append(g.gvkNamespaces, gn)
|
||||
g.seen[gn] = true
|
||||
}
|
||||
}
|
||||
|
||||
// CachingClusterReader is an implementation of the ObserverReader interface that will
|
||||
// pre-fetch all resources needed before every sync loop. The resources needed are decided by
|
||||
// finding all combinations of GroupVersionKind and namespace referenced by the provided
|
||||
// identifiers. This list is then expanded to include any known generated resource types.
|
||||
type CachingClusterReader struct {
|
||||
sync.RWMutex
|
||||
|
||||
// reader provides functions to read and list resources from the
|
||||
// cluster.
|
||||
reader client.Reader
|
||||
|
||||
// mapper is the client-side representation of the server-side scheme. It is used
|
||||
// to resolve GroupVersionKind from GroupKind.
|
||||
mapper meta.RESTMapper
|
||||
|
||||
// gns contains the slice of all the GVK and namespace combinations that
|
||||
// should be included in the cache. This is computed based the resource identifiers
|
||||
// passed in when the CachingClusterReader is created and augmented with other
|
||||
// resource types needed to compute status (see genGroupKinds).
|
||||
gns []gvkNamespace
|
||||
|
||||
// cache contains the resources found in the cluster for the given combination
|
||||
// of GVK and namespace. Before each polling cycle, the framework will call the
|
||||
// Sync function, which is responsible for repopulating the cache.
|
||||
cache map[gvkNamespace]unstructured.UnstructuredList
|
||||
}
|
||||
|
||||
// gvkNamespace contains information about a GroupVersionKind and a namespace.
|
||||
type gvkNamespace struct {
|
||||
GVK schema.GroupVersionKind
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// Get looks up the resource identified by the key and the object GVK in the cache. If the needed combination
|
||||
// of GVK and namespace is not part of the cache, that is considered an error.
|
||||
func (c *CachingClusterReader) Get(_ context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
gvk := obj.GetObjectKind().GroupVersionKind()
|
||||
mapping, err := c.mapper.RESTMapping(gvk.GroupKind())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
gn := gvkNamespace{
|
||||
GVK: gvk,
|
||||
Namespace: key.Namespace,
|
||||
}
|
||||
cacheList, found := c.cache[gn]
|
||||
if !found {
|
||||
return fmt.Errorf("GVK %s and Namespace %s not found in cache", gvk.String(), gn.Namespace)
|
||||
}
|
||||
for _, u := range cacheList.Items {
|
||||
if u.GetName() == key.Name {
|
||||
obj.Object = u.Object
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return errors.NewNotFound(mapping.Resource.GroupResource(), key.Name)
|
||||
}
|
||||
|
||||
// ListNamespaceScoped lists all resource identifier by the GVK of the list, the namespace and the selector
|
||||
// from the cache. If the needed combination of GVK and namespace is not part of the cache, that is considered an error.
|
||||
func (c *CachingClusterReader) ListNamespaceScoped(_ context.Context, list *unstructured.UnstructuredList, namespace string, selector labels.Selector) error {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
gvk := list.GroupVersionKind()
|
||||
gn := gvkNamespace{
|
||||
GVK: gvk,
|
||||
Namespace: namespace,
|
||||
}
|
||||
|
||||
cacheList, found := c.cache[gn]
|
||||
if !found {
|
||||
return fmt.Errorf("GVK %s and Namespace %s not found in cache", gvk.String(), gn.Namespace)
|
||||
}
|
||||
|
||||
var items []unstructured.Unstructured
|
||||
for _, u := range cacheList.Items {
|
||||
if selector.Matches(labels.Set(u.GetLabels())) {
|
||||
items = append(items, u)
|
||||
}
|
||||
}
|
||||
list.Items = items
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListClusterScoped lists all resource identifier by the GVK of the list and selector
|
||||
// from the cache. If the needed combination of GVK and namespace (which for clusterscoped resources
|
||||
// will always be the empty string) is not part of the cache, that is considered an error.
|
||||
func (c *CachingClusterReader) ListClusterScoped(ctx context.Context, list *unstructured.UnstructuredList, selector labels.Selector) error {
|
||||
return c.ListNamespaceScoped(ctx, list, "", selector)
|
||||
}
|
||||
|
||||
// Sync loops over the list of gvkNamespace we know of, and uses list calls to fetch the resources.
|
||||
// This information populates the cache.
|
||||
func (c *CachingClusterReader) Sync(ctx context.Context) error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
cache := make(map[gvkNamespace]unstructured.UnstructuredList)
|
||||
for _, gn := range c.gns {
|
||||
mapping, err := c.mapper.RESTMapping(gn.GVK.GroupKind())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var listOptions []client.ListOption
|
||||
if mapping.Scope == meta.RESTScopeNamespace {
|
||||
listOptions = append(listOptions, client.InNamespace(gn.Namespace))
|
||||
}
|
||||
var list unstructured.UnstructuredList
|
||||
list.SetGroupVersionKind(gn.GVK)
|
||||
err = c.reader.List(ctx, &list, listOptions...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cache[gn] = list
|
||||
}
|
||||
c.cache = cache
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,134 @@
|
|||
package reader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"gotest.tools/assert"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"sigs.k8s.io/cli-utils/pkg/kstatus/observe/testutil"
|
||||
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
var (
|
||||
deploymentGVK = appsv1.SchemeGroupVersion.WithKind("Deployment")
|
||||
rsGVK = appsv1.SchemeGroupVersion.WithKind("ReplicaSet")
|
||||
podGVK = v1.SchemeGroupVersion.WithKind("Pod")
|
||||
)
|
||||
|
||||
func TestSync(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
identifiers []wait.ResourceIdentifier
|
||||
expectedSynced []gvkNamespace
|
||||
}{
|
||||
"no identifiers": {
|
||||
identifiers: []wait.ResourceIdentifier{},
|
||||
},
|
||||
"same GVK in multiple namespaces": {
|
||||
identifiers: []wait.ResourceIdentifier{
|
||||
{
|
||||
GroupKind: deploymentGVK.GroupKind(),
|
||||
Name: "deployment",
|
||||
Namespace: "Foo",
|
||||
},
|
||||
{
|
||||
GroupKind: deploymentGVK.GroupKind(),
|
||||
Name: "deployment",
|
||||
Namespace: "Bar",
|
||||
},
|
||||
},
|
||||
expectedSynced: []gvkNamespace{
|
||||
{
|
||||
GVK: deploymentGVK,
|
||||
Namespace: "Foo",
|
||||
},
|
||||
{
|
||||
GVK: rsGVK,
|
||||
Namespace: "Foo",
|
||||
},
|
||||
{
|
||||
GVK: podGVK,
|
||||
Namespace: "Foo",
|
||||
},
|
||||
{
|
||||
GVK: deploymentGVK,
|
||||
Namespace: "Bar",
|
||||
},
|
||||
{
|
||||
GVK: rsGVK,
|
||||
Namespace: "Bar",
|
||||
},
|
||||
{
|
||||
GVK: podGVK,
|
||||
Namespace: "Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fakeMapper := testutil.NewFakeRESTMapper(
|
||||
appsv1.SchemeGroupVersion.WithKind("Deployment"),
|
||||
appsv1.SchemeGroupVersion.WithKind("ReplicaSet"),
|
||||
v1.SchemeGroupVersion.WithKind("Pod"),
|
||||
)
|
||||
|
||||
for tn, tc := range testCases {
|
||||
t.Run(tn, func(t *testing.T) {
|
||||
fakeReader := &fakeReader{}
|
||||
|
||||
observerReader, err := NewCachingClusterReader(fakeReader, fakeMapper, tc.identifiers)
|
||||
assert.NilError(t, err)
|
||||
|
||||
err = observerReader.Sync(context.Background())
|
||||
assert.NilError(t, err)
|
||||
|
||||
synced := fakeReader.syncedGVKNamespaces
|
||||
sortGVKNamespaces(synced)
|
||||
expectedSynced := tc.expectedSynced
|
||||
sortGVKNamespaces(expectedSynced)
|
||||
assert.DeepEqual(t, expectedSynced, synced)
|
||||
|
||||
assert.Equal(t, len(tc.expectedSynced), len(observerReader.cache))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func sortGVKNamespaces(gvkNamespaces []gvkNamespace) {
|
||||
sort.Slice(gvkNamespaces, func(i, j int) bool {
|
||||
if gvkNamespaces[i].GVK.String() != gvkNamespaces[j].GVK.String() {
|
||||
return gvkNamespaces[i].GVK.String() < gvkNamespaces[j].GVK.String()
|
||||
}
|
||||
return gvkNamespaces[i].Namespace < gvkNamespaces[j].Namespace
|
||||
})
|
||||
}
|
||||
|
||||
type fakeReader struct {
|
||||
syncedGVKNamespaces []gvkNamespace
|
||||
}
|
||||
|
||||
func (f *fakeReader) Get(_ context.Context, _ client.ObjectKey, _ runtime.Object) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
//nolint:gocritic
|
||||
func (f *fakeReader) List(_ context.Context, list runtime.Object, opts ...client.ListOption) error {
|
||||
var namespace string
|
||||
for _, opt := range opts {
|
||||
switch opt := opt.(type) {
|
||||
case client.InNamespace:
|
||||
namespace = string(opt)
|
||||
}
|
||||
}
|
||||
|
||||
gvk := list.GetObjectKind().GroupVersionKind()
|
||||
f.syncedGVKNamespaces = append(f.syncedGVKNamespaces, gvkNamespace{
|
||||
GVK: gvk,
|
||||
Namespace: namespace,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
package reader
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
// DirectClusterReader is an implementation of the ObserverReader that just delegates all calls directly to
|
||||
// the underlying reader. No caching.
|
||||
type DirectClusterReader struct {
|
||||
Reader client.Reader
|
||||
}
|
||||
|
||||
func (n *DirectClusterReader) Get(ctx context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error {
|
||||
return n.Reader.Get(ctx, key, obj)
|
||||
}
|
||||
|
||||
func (n *DirectClusterReader) ListNamespaceScoped(ctx context.Context, list *unstructured.UnstructuredList, namespace string, selector labels.Selector) error {
|
||||
return n.Reader.List(ctx, list, client.InNamespace(namespace), client.MatchingLabelsSelector{Selector: selector})
|
||||
}
|
||||
|
||||
func (n *DirectClusterReader) ListClusterScoped(ctx context.Context, list *unstructured.UnstructuredList, selector labels.Selector) error {
|
||||
return n.Reader.List(ctx, list, client.MatchingLabelsSelector{Selector: selector})
|
||||
}
|
||||
|
||||
func (n *DirectClusterReader) Sync(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
Loading…
Reference in New Issue