From cb96d39804ac0c4713dfb6f23f2c4b68447429b3 Mon Sep 17 00:00:00 2001 From: Ole Markus With Date: Wed, 8 Jun 2022 20:30:51 +0200 Subject: [PATCH] Use dynamic client for applying channels manifest rather than calling kubectl --- channels/pkg/channels/addon.go | 8 +- channels/pkg/channels/apply.go | 136 +++++++++++++++++++----------- channels/pkg/cmd/apply_channel.go | 7 +- 3 files changed, 97 insertions(+), 54 deletions(-) diff --git a/channels/pkg/channels/addon.go b/channels/pkg/channels/addon.go index e3181a22db..8a3e626708 100644 --- a/channels/pkg/channels/addon.go +++ b/channels/pkg/channels/addon.go @@ -153,7 +153,7 @@ func (a *Addon) GetManifestFullUrl() (*url.URL, error) { return manifestURL, nil } -func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interface, cmClient certmanager.Interface, pruner *Pruner, existingVersion *ChannelVersion) (*AddonUpdate, error) { +func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interface, cmClient certmanager.Interface, pruner *Pruner, applier *Applier, existingVersion *ChannelVersion) (*AddonUpdate, error) { required, err := a.GetRequiredUpdates(ctx, k8sClient, cmClient, existingVersion) if err != nil { return nil, err @@ -165,7 +165,7 @@ func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interfac var merr error if required.NewVersion != nil { - err := a.updateAddon(ctx, k8sClient, pruner, required) + err := a.updateAddon(ctx, k8sClient, pruner, applier, required) if err != nil { merr = multierr.Append(merr, err) } @@ -179,7 +179,7 @@ func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interfac return required, merr } -func (a *Addon) updateAddon(ctx context.Context, k8sClient kubernetes.Interface, pruner *Pruner, required *AddonUpdate) error { +func (a *Addon) updateAddon(ctx context.Context, k8sClient kubernetes.Interface, pruner *Pruner, applier *Applier, required *AddonUpdate) error { manifestURL, err := a.GetManifestFullUrl() if err != nil { return err @@ -193,7 +193,7 @@ func (a *Addon) updateAddon(ctx context.Context, k8sClient kubernetes.Interface, return fmt.Errorf("error reading manifest: %w", err) } - if err := Apply(data); err != nil { + if err := applier.Apply(ctx, data); err != nil { return fmt.Errorf("error applying update from %q: %w", manifestURL, err) } diff --git a/channels/pkg/channels/apply.go b/channels/pkg/channels/apply.go index d787221216..9da382375b 100644 --- a/channels/pkg/channels/apply.go +++ b/channels/pkg/channels/apply.go @@ -17,76 +17,114 @@ limitations under the License. package channels import ( + "context" "fmt" - "os" - "os/exec" - "path" - "strings" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/restmapper" "k8s.io/klog/v2" + "k8s.io/kops/pkg/kubemanifest" ) -// Apply calls kubectl apply to apply the manifest. -// We will likely in future change this to create things directly (or more likely embed this logic into kubectl itself) -func Apply(data []byte) error { - // We copy the manifest to a temp file because it is likely e.g. an s3 URL, which kubectl can't read - tmpDir, err := os.MkdirTemp("", "channel") +type Applier struct { + Client dynamic.Interface + RESTMapper *restmapper.DeferredDiscoveryRESTMapper +} + +// Apply applies the manifest to the cluster. +func (p *Applier) Apply(ctx context.Context, manifest []byte) error { + objects, err := kubemanifest.LoadObjectsFrom(manifest) if err != nil { - return fmt.Errorf("error creating temp dir: %v", err) + return fmt.Errorf("failed to parse objects: %w", err) } - defer func() { - if err := os.RemoveAll(tmpDir); err != nil { - klog.Warningf("error deleting temp dir %q: %v", tmpDir, err) + objectsByKind := make(map[schema.GroupKind][]*kubemanifest.Object) + for _, object := range objects { + gv, err := schema.ParseGroupVersion(object.APIVersion()) + if err != nil || gv.Version == "" { + return fmt.Errorf("failed to parse apiVersion %q", object.APIVersion()) } - }() - - localManifestFile := path.Join(tmpDir, "manifest.yaml") - if err := os.WriteFile(localManifestFile, data, 0o600); err != nil { - return fmt.Errorf("error writing temp file: %v", err) - } - // First do an apply. This may fail when removing things from lists/arrays and required fields are not removed. - { - _, err := execKubectl("apply", "-f", localManifestFile, "--server-side", "--force-conflicts", "--field-manager=kops") - if err != nil { - klog.Errorf("failed to apply the manifest: %v", err) + kind := object.Kind() + if kind == "" { + return fmt.Errorf("failed to find kind in object") } + gvk := gv.WithKind(kind) + gk := gvk.GroupKind() + objectsByKind[gk] = append(objectsByKind[gk], object) } - // Replace will force ownership on all fields to kops. But on some k8s versions, this will fail on e.g trying to set clusterIP to "". - { - _, err := execKubectl("replace", "-f", localManifestFile, "--field-manager=kops") - if err != nil { - klog.Errorf("failed to replace manifest: %v", err) + for gk := range objectsByKind { + if err := p.applyObjectsOfKind(ctx, gk, objectsByKind[gk]); err != nil { + return fmt.Errorf("failed to apply objects of kind %s: %w", gk, err) } } + return nil +} - // Do a final replace to ensure resources are correctly apply. This should always succeed if the addon is updated as expected. - { - _, err := execKubectl("apply", "-f", localManifestFile, "--server-side", "--force-conflicts", "--field-manager=kops") - if err != nil { - return fmt.Errorf("failed to apply the manifest: %w", err) - } +func (p *Applier) applyObjectsOfKind(ctx context.Context, gk schema.GroupKind, expectedObjects []*kubemanifest.Object) error { + klog.V(2).Infof("applying objects of kind: %v", gk) + + restMapping, err := p.RESTMapper.RESTMapping(gk) + if err != nil { + return fmt.Errorf("unable to find resource for %s: %w", gk, err) + } + + gvr := restMapping.Resource + + baseResource := p.Client.Resource(gvr) + + actualObjects, err := baseResource.List(ctx, v1.ListOptions{}) + if err != nil { + return fmt.Errorf("error listing objects: %w", err) + } + if err := p.applyObjects(ctx, gvr, actualObjects, expectedObjects); err != nil { + return err } return nil } -func execKubectl(args ...string) (string, error) { - kubectlPath := "kubectl" // Assume in PATH - cmd := exec.Command(kubectlPath, args...) - env := os.Environ() - cmd.Env = env - - human := strings.Join(cmd.Args, " ") - klog.V(2).Infof("Running command: %s", human) - output, err := cmd.CombinedOutput() - if err != nil { - klog.Infof("error running %s", human) - klog.Info(string(output)) - return string(output), fmt.Errorf("error running kubectl: %v", err) +func (p *Applier) applyObjects(ctx context.Context, gvr schema.GroupVersionResource, actualObjects *unstructured.UnstructuredList, expectedObjects []*kubemanifest.Object) error { + actualMap := make(map[string]unstructured.Unstructured) + for _, actualObject := range actualObjects.Items { + key := actualObject.GetNamespace() + "/" + actualObject.GetName() + actualMap[key] = actualObject } - return string(output), err + for _, expectedObjects := range expectedObjects { + name := expectedObjects.GetName() + namespace := expectedObjects.GetNamespace() + key := namespace + "/" + name + + var resource dynamic.ResourceInterface + if namespace != "" { + resource = p.Client.Resource(gvr).Namespace(namespace) + } else { + resource = p.Client.Resource(gvr) + } + + obj := expectedObjects.ToUnstructured() + + if actual, found := actualMap[key]; found { + klog.V(2).Infof("updating %s %s", gvr, key) + var opts v1.UpdateOptions + obj.SetResourceVersion(actual.GetResourceVersion()) + if _, err := resource.Update(ctx, obj, opts); err != nil { + return fmt.Errorf("failed to create %s: %w", key, err) + } + } else { + klog.V(2).Infof("creating %s %s", gvr, key) + var opts v1.CreateOptions + if _, err := resource.Create(ctx, obj, opts); err != nil { + return fmt.Errorf("failed to create %s: %w", key, err) + } + } + + } + + return nil } diff --git a/channels/pkg/cmd/apply_channel.go b/channels/pkg/cmd/apply_channel.go index ad375a0795..4947fa8436 100644 --- a/channels/pkg/cmd/apply_channel.go +++ b/channels/pkg/cmd/apply_channel.go @@ -169,10 +169,15 @@ func applyMenu(ctx context.Context, menu *channels.AddonMenu, k8sClient kubernet RESTMapper: restMapper, } + applier := &channels.Applier{ + Client: dynamicClient, + RESTMapper: restMapper, + } + var merr error for _, needUpdate := range needUpdates { - update, err := needUpdate.EnsureUpdated(ctx, k8sClient, cmClient, pruner, channelVersions[needUpdate.GetNamespace()+":"+needUpdate.Name]) + update, err := needUpdate.EnsureUpdated(ctx, k8sClient, cmClient, pruner, applier, channelVersions[needUpdate.GetNamespace()+":"+needUpdate.Name]) if err != nil { merr = multierr.Append(merr, fmt.Errorf("updating %q: %w", needUpdate.Name, err)) } else if update != nil {