Use dynamic client for applying channels manifest rather than calling kubectl

This commit is contained in:
Ole Markus With 2022-06-08 20:30:51 +02:00
parent 3245d05771
commit cb96d39804
3 changed files with 97 additions and 54 deletions

View File

@ -153,7 +153,7 @@ func (a *Addon) GetManifestFullUrl() (*url.URL, error) {
return manifestURL, nil
}
func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interface, cmClient certmanager.Interface, pruner *Pruner, existingVersion *ChannelVersion) (*AddonUpdate, error) {
func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interface, cmClient certmanager.Interface, pruner *Pruner, applier *Applier, existingVersion *ChannelVersion) (*AddonUpdate, error) {
required, err := a.GetRequiredUpdates(ctx, k8sClient, cmClient, existingVersion)
if err != nil {
return nil, err
@ -165,7 +165,7 @@ func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interfac
var merr error
if required.NewVersion != nil {
err := a.updateAddon(ctx, k8sClient, pruner, required)
err := a.updateAddon(ctx, k8sClient, pruner, applier, required)
if err != nil {
merr = multierr.Append(merr, err)
}
@ -179,7 +179,7 @@ func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interfac
return required, merr
}
func (a *Addon) updateAddon(ctx context.Context, k8sClient kubernetes.Interface, pruner *Pruner, required *AddonUpdate) error {
func (a *Addon) updateAddon(ctx context.Context, k8sClient kubernetes.Interface, pruner *Pruner, applier *Applier, required *AddonUpdate) error {
manifestURL, err := a.GetManifestFullUrl()
if err != nil {
return err
@ -193,7 +193,7 @@ func (a *Addon) updateAddon(ctx context.Context, k8sClient kubernetes.Interface,
return fmt.Errorf("error reading manifest: %w", err)
}
if err := Apply(data); err != nil {
if err := applier.Apply(ctx, data); err != nil {
return fmt.Errorf("error applying update from %q: %w", manifestURL, err)
}

View File

@ -17,76 +17,114 @@ limitations under the License.
package channels
import (
"context"
"fmt"
"os"
"os/exec"
"path"
"strings"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/klog/v2"
"k8s.io/kops/pkg/kubemanifest"
)
// Apply calls kubectl apply to apply the manifest.
// We will likely in future change this to create things directly (or more likely embed this logic into kubectl itself)
func Apply(data []byte) error {
// We copy the manifest to a temp file because it is likely e.g. an s3 URL, which kubectl can't read
tmpDir, err := os.MkdirTemp("", "channel")
type Applier struct {
Client dynamic.Interface
RESTMapper *restmapper.DeferredDiscoveryRESTMapper
}
// Apply applies the manifest to the cluster.
func (p *Applier) Apply(ctx context.Context, manifest []byte) error {
objects, err := kubemanifest.LoadObjectsFrom(manifest)
if err != nil {
return fmt.Errorf("error creating temp dir: %v", err)
return fmt.Errorf("failed to parse objects: %w", err)
}
defer func() {
if err := os.RemoveAll(tmpDir); err != nil {
klog.Warningf("error deleting temp dir %q: %v", tmpDir, err)
objectsByKind := make(map[schema.GroupKind][]*kubemanifest.Object)
for _, object := range objects {
gv, err := schema.ParseGroupVersion(object.APIVersion())
if err != nil || gv.Version == "" {
return fmt.Errorf("failed to parse apiVersion %q", object.APIVersion())
}
}()
localManifestFile := path.Join(tmpDir, "manifest.yaml")
if err := os.WriteFile(localManifestFile, data, 0o600); err != nil {
return fmt.Errorf("error writing temp file: %v", err)
}
// First do an apply. This may fail when removing things from lists/arrays and required fields are not removed.
{
_, err := execKubectl("apply", "-f", localManifestFile, "--server-side", "--force-conflicts", "--field-manager=kops")
if err != nil {
klog.Errorf("failed to apply the manifest: %v", err)
kind := object.Kind()
if kind == "" {
return fmt.Errorf("failed to find kind in object")
}
gvk := gv.WithKind(kind)
gk := gvk.GroupKind()
objectsByKind[gk] = append(objectsByKind[gk], object)
}
// Replace will force ownership on all fields to kops. But on some k8s versions, this will fail on e.g trying to set clusterIP to "".
{
_, err := execKubectl("replace", "-f", localManifestFile, "--field-manager=kops")
if err != nil {
klog.Errorf("failed to replace manifest: %v", err)
for gk := range objectsByKind {
if err := p.applyObjectsOfKind(ctx, gk, objectsByKind[gk]); err != nil {
return fmt.Errorf("failed to apply objects of kind %s: %w", gk, err)
}
}
return nil
}
// Do a final replace to ensure resources are correctly apply. This should always succeed if the addon is updated as expected.
{
_, err := execKubectl("apply", "-f", localManifestFile, "--server-side", "--force-conflicts", "--field-manager=kops")
if err != nil {
return fmt.Errorf("failed to apply the manifest: %w", err)
}
func (p *Applier) applyObjectsOfKind(ctx context.Context, gk schema.GroupKind, expectedObjects []*kubemanifest.Object) error {
klog.V(2).Infof("applying objects of kind: %v", gk)
restMapping, err := p.RESTMapper.RESTMapping(gk)
if err != nil {
return fmt.Errorf("unable to find resource for %s: %w", gk, err)
}
gvr := restMapping.Resource
baseResource := p.Client.Resource(gvr)
actualObjects, err := baseResource.List(ctx, v1.ListOptions{})
if err != nil {
return fmt.Errorf("error listing objects: %w", err)
}
if err := p.applyObjects(ctx, gvr, actualObjects, expectedObjects); err != nil {
return err
}
return nil
}
func execKubectl(args ...string) (string, error) {
kubectlPath := "kubectl" // Assume in PATH
cmd := exec.Command(kubectlPath, args...)
env := os.Environ()
cmd.Env = env
human := strings.Join(cmd.Args, " ")
klog.V(2).Infof("Running command: %s", human)
output, err := cmd.CombinedOutput()
if err != nil {
klog.Infof("error running %s", human)
klog.Info(string(output))
return string(output), fmt.Errorf("error running kubectl: %v", err)
func (p *Applier) applyObjects(ctx context.Context, gvr schema.GroupVersionResource, actualObjects *unstructured.UnstructuredList, expectedObjects []*kubemanifest.Object) error {
actualMap := make(map[string]unstructured.Unstructured)
for _, actualObject := range actualObjects.Items {
key := actualObject.GetNamespace() + "/" + actualObject.GetName()
actualMap[key] = actualObject
}
return string(output), err
for _, expectedObjects := range expectedObjects {
name := expectedObjects.GetName()
namespace := expectedObjects.GetNamespace()
key := namespace + "/" + name
var resource dynamic.ResourceInterface
if namespace != "" {
resource = p.Client.Resource(gvr).Namespace(namespace)
} else {
resource = p.Client.Resource(gvr)
}
obj := expectedObjects.ToUnstructured()
if actual, found := actualMap[key]; found {
klog.V(2).Infof("updating %s %s", gvr, key)
var opts v1.UpdateOptions
obj.SetResourceVersion(actual.GetResourceVersion())
if _, err := resource.Update(ctx, obj, opts); err != nil {
return fmt.Errorf("failed to create %s: %w", key, err)
}
} else {
klog.V(2).Infof("creating %s %s", gvr, key)
var opts v1.CreateOptions
if _, err := resource.Create(ctx, obj, opts); err != nil {
return fmt.Errorf("failed to create %s: %w", key, err)
}
}
}
return nil
}

View File

@ -169,10 +169,15 @@ func applyMenu(ctx context.Context, menu *channels.AddonMenu, k8sClient kubernet
RESTMapper: restMapper,
}
applier := &channels.Applier{
Client: dynamicClient,
RESTMapper: restMapper,
}
var merr error
for _, needUpdate := range needUpdates {
update, err := needUpdate.EnsureUpdated(ctx, k8sClient, cmClient, pruner, channelVersions[needUpdate.GetNamespace()+":"+needUpdate.Name])
update, err := needUpdate.EnsureUpdated(ctx, k8sClient, cmClient, pruner, applier, channelVersions[needUpdate.GetNamespace()+":"+needUpdate.Name])
if err != nil {
merr = multierr.Append(merr, fmt.Errorf("updating %q: %w", needUpdate.Name, err))
} else if update != nil {