From 6184f1286164c859c6e1214b2681eefe86bc3d74 Mon Sep 17 00:00:00 2001 From: Justin SB Date: Fri, 22 Jul 2022 08:43:35 -0700 Subject: [PATCH 1/2] Introduce applylib A number of projects need a library for applying objects. While we are figuring out exactly what functionality is needed, we are using a copy-and-paste approach. Introduce applylib here also, and add the create-or-update functionality that we want here. --- channels/pkg/channels/clientapplier.go | 114 ++++++----------- pkg/applylib/applyset/applyset.go | 163 +++++++++++++++++++++++++ pkg/applylib/applyset/health.go | 89 ++++++++++++++ pkg/applylib/applyset/interfaces.go | 37 ++++++ pkg/applylib/applyset/results.go | 78 ++++++++++++ pkg/applylib/applyset/tracker.go | 104 ++++++++++++++++ pkg/applylib/doc.go | 21 ++++ pkg/kubemanifest/manifest.go | 12 +- 8 files changed, 538 insertions(+), 80 deletions(-) create mode 100644 pkg/applylib/applyset/applyset.go create mode 100644 pkg/applylib/applyset/health.go create mode 100644 pkg/applylib/applyset/interfaces.go create mode 100644 pkg/applylib/applyset/results.go create mode 100644 pkg/applylib/applyset/tracker.go create mode 100644 pkg/applylib/doc.go diff --git a/channels/pkg/channels/clientapplier.go b/channels/pkg/channels/clientapplier.go index f811d3049b..85a1205964 100644 --- a/channels/pkg/channels/clientapplier.go +++ b/channels/pkg/channels/clientapplier.go @@ -18,19 +18,13 @@ package channels import ( "context" - "encoding/json" "fmt" - "go.uber.org/multierr" - "k8s.io/apimachinery/pkg/api/meta" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/dynamic" "k8s.io/client-go/restmapper" - "k8s.io/klog/v2" + "k8s.io/kops/pkg/applylib/applyset" "k8s.io/kops/pkg/kubemanifest" - "k8s.io/kops/upup/pkg/fi" ) type ClientApplier struct { @@ -45,89 +39,51 @@ func (p *ClientApplier) Apply(ctx context.Context, manifest []byte) error { return fmt.Errorf("failed to parse objects: %w", err) } - objectsByGVK := make(map[schema.GroupVersionKind][]*kubemanifest.Object) - for _, object := range objects { - key := object.GetNamespace() + "/" + object.GetName() - gv, err := schema.ParseGroupVersion(object.APIVersion()) - if err != nil || gv.Version == "" { - return fmt.Errorf("failed to parse apiVersion %q in object %s", object.APIVersion(), key) - } - kind := object.Kind() - if kind == "" { - return fmt.Errorf("failed to find kind in object %s", key) - } - - gvk := gv.WithKind(kind) - objectsByGVK[gvk] = append(objectsByGVK[gvk], object) + // TODO: Cache applyset for more efficient applying + patchOptions := metav1.PatchOptions{ + FieldManager: "kops", } - var applyErrors error - for gvk := range objectsByGVK { - if err := p.applyObjectsOfKind(ctx, gvk, objectsByGVK[gvk]); err != nil { - applyErrors = multierr.Append(applyErrors, fmt.Errorf("failed to apply objects of kind %s: %w", gvk, err)) - } - } - return applyErrors -} + // We force to overcome errors like: Apply failed with 1 conflict: conflict with "kubectl-client-side-apply" using apps/v1: .spec.template.spec.containers[name="foo"].image + // TODO: How to handle this better? In a controller we don't have a choice and have to force eventually. + // But we could do something like try first without forcing, log the conflict if there is one, and then force. + // This would mean that if there was a loop we could log/detect it. + // We could even do things like back-off on the force apply. + force := true + patchOptions.Force = &force -func (p *ClientApplier) applyObjectsOfKind(ctx context.Context, gvk schema.GroupVersionKind, expectedObjects []*kubemanifest.Object) error { - klog.V(2).Infof("applying objects of kind: %v", gvk) - - restMapping, err := p.RESTMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + s, err := applyset.New(applyset.Options{ + RESTMapper: p.RESTMapper, + Client: p.Client, + PatchOptions: patchOptions, + }) if err != nil { - return fmt.Errorf("unable to find resource for %s: %w", gvk, err) - } - - if err := p.applyObjects(ctx, restMapping, expectedObjects); err != nil { return err } - return nil -} - -func (p *ClientApplier) applyObjects(ctx context.Context, restMapping *meta.RESTMapping, expectedObjects []*kubemanifest.Object) error { - var merr error - - for _, expectedObject := range expectedObjects { - err := p.patchObject(ctx, restMapping, expectedObject) - merr = multierr.Append(merr, err) + var applyableObjects []applyset.ApplyableObject + for _, object := range objects { + applyableObjects = append(applyableObjects, object) + } + if err := s.SetDesiredObjects(applyableObjects); err != nil { + return err } - return merr -} - -func (p *ClientApplier) patchObject(ctx context.Context, restMapping *meta.RESTMapping, expectedObject *kubemanifest.Object) error { - gvr := restMapping.Resource - name := expectedObject.GetName() - namespace := expectedObject.GetNamespace() - key := namespace + "/" + name - - var resource dynamic.ResourceInterface - - if restMapping.Scope.Name() == meta.RESTScopeNameNamespace { - if namespace == "" { - return fmt.Errorf("namespace not set for namespace-scoped object %q", key) - } - resource = p.Client.Resource(gvr).Namespace(namespace) - } else { - if namespace != "" { - return fmt.Errorf("namespace was set for cluster-scoped object %q", key) - } - resource = p.Client.Resource(gvr) - } - - obj := expectedObject.ToUnstructured() - - jsonData, err := json.Marshal(obj) + results, err := s.ApplyOnce(ctx) if err != nil { - return fmt.Errorf("failed to marsal %q into json: %w", obj.GetName(), err) + return fmt.Errorf("failed to apply objects: %w", err) } - { - _, err := resource.Patch(ctx, obj.GetName(), types.ApplyPatchType, jsonData, v1.PatchOptions{FieldManager: "kops", Force: fi.Bool(true)}) - if err != nil { - return fmt.Errorf("failed to patch object %q: %w", obj.GetName(), err) - } + // TODO: Implement pruning + + if !results.AllApplied() { + return fmt.Errorf("not all objects were applied") } + + // TODO: Check object health status + if !results.AllHealthy() { + return fmt.Errorf("not all objects were healthy") + } + return nil } diff --git a/pkg/applylib/applyset/applyset.go b/pkg/applylib/applyset/applyset.go new file mode 100644 index 0000000000..3cc5931b19 --- /dev/null +++ b/pkg/applylib/applyset/applyset.go @@ -0,0 +1,163 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package applyset + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" +) + +// ApplySet is a set of objects that we want to apply to the cluster. +// +// An ApplySet has a few cases which it tries to optimize for: +// * We can change the objects we're applying +// * We want to watch the objects we're applying / be notified of changes +// * We want to know when the objects we apply are "healthy" +// * We expose a "try once" method to better support running from a controller. +// +// TODO: Pluggable health functions. +// TODO: Pruning +type ApplySet struct { + // client is the dynamic kubernetes client used to apply objects to the k8s cluster. + client dynamic.Interface + // restMapper is used to map object kind to resources, and to know if objects are cluster-scoped. + restMapper meta.RESTMapper + // patchOptions holds the options used when applying, in particular the fieldManager + patchOptions metav1.PatchOptions + + // mutex guards trackers + mutex sync.Mutex + // trackers is a (mutable) pointer to the (immutable) objectTrackerList, containing a list of objects we are applying. + trackers *objectTrackerList +} + +// Options holds the parameters for building an ApplySet. +type Options struct { + // Client is the dynamic kubernetes client used to apply objects to the k8s cluster. + Client dynamic.Interface + // RESTMapper is used to map object kind to resources, and to know if objects are cluster-scoped. + RESTMapper meta.RESTMapper + // PatchOptions holds the options used when applying, in particular the fieldManager + PatchOptions metav1.PatchOptions +} + +// New constructs a new ApplySet +func New(options Options) (*ApplySet, error) { + a := &ApplySet{ + client: options.Client, + restMapper: options.RESTMapper, + patchOptions: options.PatchOptions, + } + a.trackers = &objectTrackerList{} + return a, nil +} + +// SetDesiredObjects is used to replace the desired state of all the objects. +// Any objects not specified are removed from the "desired" set. +func (a *ApplySet) SetDesiredObjects(objects []ApplyableObject) error { + a.mutex.Lock() + defer a.mutex.Unlock() + + newTrackers := a.trackers.setDesiredObjects(objects) + a.trackers = newTrackers + + return nil +} + +// ApplyOnce will make one attempt to apply all objects and observe their health. +// It does not wait for the objects to become healthy, but will report their health. +// +// TODO: Limit the amount of time this takes, particularly if we have thousands of objects. +// +// We don't _have_ to try to apply all objects if it is taking too long. +// +// TODO: We re-apply every object every iteration; we should be able to do better. +func (a *ApplySet) ApplyOnce(ctx context.Context) (*ApplyResults, error) { + // snapshot the state + a.mutex.Lock() + trackers := a.trackers + a.mutex.Unlock() + + results := &ApplyResults{total: len(trackers.items)} + + for i := range trackers.items { + tracker := &trackers.items[i] + obj := tracker.desired + + name := obj.GetName() + ns := obj.GetNamespace() + gvk := obj.GroupVersionKind() + nn := types.NamespacedName{Namespace: ns, Name: name} + + restMapping, err := a.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + results.applyError(gvk, nn, fmt.Errorf("error getting rest mapping for %v: %w", gvk, err)) + continue + } + gvr := restMapping.Resource + + var dynamicResource dynamic.ResourceInterface + + switch restMapping.Scope.Name() { + case meta.RESTScopeNameNamespace: + if ns == "" { + // TODO: Differentiate between server-fixable vs client-fixable errors? + results.applyError(gvk, nn, fmt.Errorf("namespace was not provided for namespace-scoped object %v", gvk)) + continue + } + dynamicResource = a.client.Resource(gvr).Namespace(ns) + + case meta.RESTScopeNameRoot: + if ns != "" { + // TODO: Differentiate between server-fixable vs client-fixable errors? + results.applyError(gvk, nn, fmt.Errorf("namespace %q was provided for cluster-scoped object %v", obj.GetNamespace(), gvk)) + continue + } + dynamicResource = a.client.Resource(gvr) + + default: + // Internal error ... this is panic-level + return nil, fmt.Errorf("unknown scope for gvk %s: %q", gvk, restMapping.Scope.Name()) + } + + j, err := json.Marshal(obj) + if err != nil { + // TODO: Differentiate between server-fixable vs client-fixable errors? + results.applyError(gvk, nn, fmt.Errorf("failed to marshal object to JSON: %w", err)) + continue + } + + lastApplied, err := dynamicResource.Patch(ctx, name, types.ApplyPatchType, j, a.patchOptions) + if err != nil { + results.applyError(gvk, nn, fmt.Errorf("error from apply: %w", err)) + continue + } + + tracker.lastApplied = lastApplied + results.applySuccess(gvk, nn) + tracker.isHealthy = isHealthy(lastApplied) + results.reportHealth(gvk, nn, tracker.isHealthy) + } + return results, nil +} diff --git a/pkg/applylib/applyset/health.go b/pkg/applylib/applyset/health.go new file mode 100644 index 0000000000..aaa0638991 --- /dev/null +++ b/pkg/applylib/applyset/health.go @@ -0,0 +1,89 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package applyset + +import ( + "encoding/json" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/klog/v2" +) + +// isHealthy reports whether the object should be considered "healthy" +// TODO: Replace with kstatus library +func isHealthy(u *unstructured.Unstructured) bool { + ready := true + + statusConditions, found, err := unstructured.NestedFieldNoCopy(u.Object, "status", "conditions") + if err != nil || !found { + klog.Infof("status conditions not found for %s", u.GroupVersionKind()) + return true + } + + statusConditionsList, ok := statusConditions.([]interface{}) + if !ok { + klog.Warningf("expected status.conditions to be list, got %T", statusConditions) + return true + } + for i := range statusConditionsList { + condition := statusConditionsList[i] + conditionMap, ok := condition.(map[string]interface{}) + if !ok { + klog.Warningf("expected status.conditions[%d] to be map, got %T", i, condition) + return true + } + + conditionType := "" + conditionStatus := "" + for k, v := range conditionMap { + switch k { + case "type": + s, ok := v.(string) + if !ok { + klog.Warningf("expected status.conditions[].type to be string, got %T", v) + } else { + conditionType = s + } + case "status": + s, ok := v.(string) + if !ok { + klog.Warningf("expected status.conditions[].status to be string, got %T", v) + } else { + conditionStatus = s + } + } + } + + // TODO: Check conditionType? + + switch conditionStatus { + case "True": + // ready + + case "False": + j, _ := json.Marshal(condition) + klog.Infof("status.conditions indicates object is not ready: %v", string(j)) + ready = false + + case "": + klog.Warningf("ignoring status.conditions[] type %q with unknown status %q", conditionType, conditionStatus) + } + } + + klog.Infof("isHealthy %s %s/%s => %v", u.GroupVersionKind(), u.GetNamespace(), u.GetName(), ready) + return ready +} diff --git a/pkg/applylib/applyset/interfaces.go b/pkg/applylib/applyset/interfaces.go new file mode 100644 index 0000000000..4014dd28d4 --- /dev/null +++ b/pkg/applylib/applyset/interfaces.go @@ -0,0 +1,37 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package applyset + +import ( + "encoding/json" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// ApplyableObject is implemented by objects that can be applied to the cluster. +// We don't need much, so this might allow for more efficient implementations in future. +type ApplyableObject interface { + // GroupVersionKind returns the GroupVersionKind structure describing the type of the object + GroupVersionKind() schema.GroupVersionKind + // GetNamespace returns the namespace of the object + GetNamespace() string + // GetName returns the name of the object + GetName() string + + // The object should implement json marshalling + json.Marshaler +} diff --git a/pkg/applylib/applyset/results.go b/pkg/applylib/applyset/results.go new file mode 100644 index 0000000000..f5b2f16ade --- /dev/null +++ b/pkg/applylib/applyset/results.go @@ -0,0 +1,78 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package applyset + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +// ApplyResults contains the results of an Apply operation. +type ApplyResults struct { + total int + applySuccessCount int + applyFailCount int + healthyCount int + unhealthyCount int +} + +// AllApplied is true if the desired state has been successfully applied for all objects. +// Note: you likely also want to check AllHealthy, if you want to be sure the objects are "ready". +func (r *ApplyResults) AllApplied() bool { + r.checkInvariants() + + return r.applyFailCount == 0 +} + +// AllHealthy is true if all the objects have been applied and have converged to a "ready" state. +// Note that this is only meaningful if AllApplied is true. +func (r *ApplyResults) AllHealthy() bool { + r.checkInvariants() + + return r.unhealthyCount == 0 +} + +// checkInvariants is an internal function that warns if the object doesn't match the expected invariants. +func (r *ApplyResults) checkInvariants() { + if r.total != (r.applySuccessCount + r.applyFailCount) { + klog.Warningf("consistency error (apply counts): %#v", r) + } else if r.total != (r.healthyCount + r.unhealthyCount) { + // This "invariant" only holds when all objects could be applied + klog.Warningf("consistency error (healthy counts): %#v", r) + } +} + +// applyError records that the apply of an object failed with an error. +func (r *ApplyResults) applyError(gvk schema.GroupVersionKind, nn types.NamespacedName, err error) { + r.applyFailCount++ + klog.Warningf("error from apply on %s %s: %v", gvk, nn, err) +} + +// applySuccess records that an object was applied and this succeeded. +func (r *ApplyResults) applySuccess(gvk schema.GroupVersionKind, nn types.NamespacedName) { + r.applySuccessCount++ +} + +// reportHealth records the health of an object. +func (r *ApplyResults) reportHealth(gvk schema.GroupVersionKind, nn types.NamespacedName, isHealthy bool) { + if isHealthy { + r.healthyCount++ + } else { + r.unhealthyCount++ + } +} diff --git a/pkg/applylib/applyset/tracker.go b/pkg/applylib/applyset/tracker.go new file mode 100644 index 0000000000..b32950bf90 --- /dev/null +++ b/pkg/applylib/applyset/tracker.go @@ -0,0 +1,104 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package applyset + +import ( + "reflect" + + "k8s.io/apimachinery/pkg/runtime" +) + +// objectTrackerList is a list of objectTrackers, containing the state of the objects we are trying to apply. +// objectTrackerList is immutable (though objectTracker is mutable); we copy-on-write when the list changes. +// TODO: Given objectTracker is mutable, should we just make objectTrackerList mutable? +type objectTrackerList struct { + items []objectTracker +} + +// objectTracker tracks the state for a single object +type objectTracker struct { + desired ApplyableObject + lastApplied runtime.Object + + desiredIsApplied bool + isHealthy bool +} + +// objectKey is the key used in maps; we consider objects with the same GVKNN the same. +type objectKey struct { + Group string + Version string + Kind string + Namespace string + Name string +} + +// computeKey returns the unique key for the object. +func computeKey(u ApplyableObject) objectKey { + gvk := u.GroupVersionKind() + return objectKey{ + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind, + Namespace: u.GetNamespace(), + Name: u.GetName(), + } +} + +// setDesiredObjects completely replaces the set of objects we are interested in. +// We aim to reuse the current state where it carries over. +// Because objectTrackerList is immutable, we copy-on-write to a new objectTrackerList and return it. +func (l *objectTrackerList) setDesiredObjects(objects []ApplyableObject) *objectTrackerList { + existingTrackers := make(map[objectKey]*objectTracker) + for i := range l.items { + tracker := &l.items[i] + key := computeKey(tracker.desired) + existingTrackers[key] = tracker + } + + newList := &objectTrackerList{} + + for _, obj := range objects { + key := computeKey(obj) + // TODO: Detect duplicate keys? + existingTracker := existingTrackers[key] + if existingTracker == nil { + newList.items = append(newList.items, objectTracker{ + desired: obj, + lastApplied: nil, + desiredIsApplied: false, + isHealthy: false, + }) + } else if reflect.DeepEqual(existingTracker.desired, obj) { + newList.items = append(newList.items, objectTracker{ + desired: obj, + lastApplied: existingTracker.lastApplied, + desiredIsApplied: existingTracker.desiredIsApplied, + isHealthy: existingTracker.isHealthy, + }) + } else { + newList.items = append(newList.items, objectTracker{ + desired: obj, + lastApplied: existingTracker.lastApplied, + desiredIsApplied: false, + isHealthy: existingTracker.isHealthy, + }) + } + } + + return newList +} diff --git a/pkg/applylib/doc.go b/pkg/applylib/doc.go new file mode 100644 index 0000000000..a450be15a4 --- /dev/null +++ b/pkg/applylib/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package applylib + +// Package applylib implements a helper library for applying a set of objects to a cluster. +// This functionality is needed across multiple projects, and we are pursuing a "copy-and-paste" +// reuse strategy while we decide what functionality is needed and where this library should live. diff --git a/pkg/kubemanifest/manifest.go b/pkg/kubemanifest/manifest.go index a490fc1cd8..33d2c620b4 100644 --- a/pkg/kubemanifest/manifest.go +++ b/pkg/kubemanifest/manifest.go @@ -18,6 +18,7 @@ package kubemanifest import ( "bytes" + "encoding/json" "fmt" "strings" @@ -44,6 +45,7 @@ func (o *Object) ToUnstructured() *unstructured.Unstructured { return &unstructured.Unstructured{Object: o.data} } +// GroupVersionKind returns the group/version/kind information for the object func (o *Object) GroupVersionKind() schema.GroupVersionKind { return o.ToUnstructured().GroupVersionKind() } @@ -126,7 +128,15 @@ func (l ObjectList) ToYAML() ([]byte, error) { func (m *Object) ToYAML() ([]byte, error) { b, err := yaml.Marshal(m.data) if err != nil { - return nil, fmt.Errorf("error marshaling manifest to yaml: %v", err) + return nil, fmt.Errorf("error marshaling manifest to yaml: %w", err) + } + return b, nil +} + +func (m *Object) MarshalJSON() ([]byte, error) { + b, err := json.Marshal(m.data) + if err != nil { + return nil, fmt.Errorf("error marshaling manifest to json: %w", err) } return b, nil } From 89201877e0f83efe37aaa1e435f2d37dc438249a Mon Sep 17 00:00:00 2001 From: Justin SB Date: Fri, 22 Jul 2022 10:09:20 -0700 Subject: [PATCH 2/2] Introduce mock kubeapiserver to applylib Use it for a simple test. --- pkg/applylib/applyset/applyset_test.go | 128 +++++++ .../testdata/testapplyset/expected.yaml | 20 ++ pkg/applylib/mocks/harness.go | 200 +++++++++++ .../mocks/mockkubeapiserver/apigrouplist.go | 76 ++++ .../mockkubeapiserver/apiresourcelist.go | 49 +++ .../mocks/mockkubeapiserver/apiserver.go | 324 ++++++++++++++++++ .../mocks/mockkubeapiserver/apiversions.go | 33 ++ .../mocks/mockkubeapiserver/controllers.go | 59 ++++ .../mocks/mockkubeapiserver/getresource.go | 67 ++++ .../mocks/mockkubeapiserver/patchresource.go | 117 +++++++ .../mocks/mockkubeapiserver/putresource.go | 82 +++++ 11 files changed, 1155 insertions(+) create mode 100644 pkg/applylib/applyset/applyset_test.go create mode 100644 pkg/applylib/applyset/testdata/testapplyset/expected.yaml create mode 100644 pkg/applylib/mocks/harness.go create mode 100644 pkg/applylib/mocks/mockkubeapiserver/apigrouplist.go create mode 100644 pkg/applylib/mocks/mockkubeapiserver/apiresourcelist.go create mode 100644 pkg/applylib/mocks/mockkubeapiserver/apiserver.go create mode 100644 pkg/applylib/mocks/mockkubeapiserver/apiversions.go create mode 100644 pkg/applylib/mocks/mockkubeapiserver/controllers.go create mode 100644 pkg/applylib/mocks/mockkubeapiserver/getresource.go create mode 100644 pkg/applylib/mocks/mockkubeapiserver/patchresource.go create mode 100644 pkg/applylib/mocks/mockkubeapiserver/putresource.go diff --git a/pkg/applylib/applyset/applyset_test.go b/pkg/applylib/applyset/applyset_test.go new file mode 100644 index 0000000000..b633e100f5 --- /dev/null +++ b/pkg/applylib/applyset/applyset_test.go @@ -0,0 +1,128 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package applyset + +import ( + "path/filepath" + "strings" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kops/pkg/applylib/mocks" + "k8s.io/kops/pkg/testutils/golden" + "sigs.k8s.io/yaml" +) + +func TestApplySet(t *testing.T) { + h := mocks.NewHarness(t) + + existing := `` + + apply := ` +apiVersion: v1 +kind: Namespace +metadata: + name: test-applyset + +--- + +apiVersion: v1 +kind: ConfigMap +metadata: + name: foo + namespace: test-applyset +data: + foo: bar +` + + h.WithObjects(h.ParseObjects(existing)...) + + applyObjects := h.ParseObjects(apply) + + patchOptions := metav1.PatchOptions{ + FieldManager: "kops", + } + + force := true + patchOptions.Force = &force + + s, err := New(Options{ + RESTMapper: h.RESTMapper(), + Client: h.DynamicClient(), + PatchOptions: patchOptions, + }) + if err != nil { + h.Fatalf("error building applyset object: %v", err) + } + + var applyableObjects []ApplyableObject + for _, object := range applyObjects { + applyableObjects = append(applyableObjects, object) + } + if err := s.SetDesiredObjects(applyableObjects); err != nil { + h.Fatalf("failed to set desired objects: %v", err) + } + + results, err := s.ApplyOnce(h.Ctx) + if err != nil { + h.Fatalf("failed to apply objects: %v", err) + } + + // TODO: Implement pruning + + if !results.AllApplied() { + h.Fatalf("not all objects were applied") + } + + // TODO: Check object health status + if !results.AllHealthy() { + h.Fatalf("not all objects were healthy") + } + + var actual []string + + for _, object := range applyObjects { + id := types.NamespacedName{ + Namespace: object.GetNamespace(), + Name: object.GetName(), + } + + u := &unstructured.Unstructured{} + u.SetAPIVersion(object.GetAPIVersion()) + u.SetKind(object.GetKind()) + + if err := h.Client().Get(h.Ctx, id, u); err != nil { + h.Fatalf("failed to get object %v: %v", id, err) + } + + metadata := u.Object["metadata"].(map[string]interface{}) + delete(metadata, "creationTimestamp") + delete(metadata, "managedFields") + delete(metadata, "resourceVersion") + delete(metadata, "uid") + + y, err := yaml.Marshal(u) + if err != nil { + h.Fatalf("failed to marshal object %v: %v", id, err) + } + actual = append(actual, string(y)) + } + testDir := filepath.Join("testdata", strings.ToLower(t.Name())) + golden.AssertMatchesFile(t, strings.Join(actual, "\n---\n"), filepath.Join(testDir, "expected.yaml")) +} diff --git a/pkg/applylib/applyset/testdata/testapplyset/expected.yaml b/pkg/applylib/applyset/testdata/testapplyset/expected.yaml new file mode 100644 index 0000000000..5e18e04fa3 --- /dev/null +++ b/pkg/applylib/applyset/testdata/testapplyset/expected.yaml @@ -0,0 +1,20 @@ +apiVersion: v1 +kind: Namespace +metadata: + labels: + kubernetes.io/metadata.name: test-applyset + name: test-applyset +spec: + finalizers: + - kubernetes +status: + phase: Active + +--- +apiVersion: v1 +data: + foo: bar +kind: ConfigMap +metadata: + name: foo + namespace: test-applyset diff --git a/pkg/applylib/mocks/harness.go b/pkg/applylib/mocks/harness.go new file mode 100644 index 0000000000..d72f17669b --- /dev/null +++ b/pkg/applylib/mocks/harness.go @@ -0,0 +1,200 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mocks + +import ( + "bytes" + "context" + "flag" + "io" + "strings" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + yamlserializer "k8s.io/apimachinery/pkg/runtime/serializer/yaml" + yamlutil "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/discovery/cached/disk" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" + "k8s.io/kops/pkg/applylib/mocks/mockkubeapiserver" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var kubeconfig = flag.String("kubeconfig", "", "set to use a real kube-apiserver") + +type Harness struct { + *testing.T + + k8s *mockkubeapiserver.MockKubeAPIServer + restConfig *rest.Config + restMapper *restmapper.DeferredDiscoveryRESTMapper + + Scheme *runtime.Scheme + Ctx context.Context + client client.Client +} + +func NewHarness(t *testing.T) *Harness { + h := &Harness{ + T: t, + Scheme: runtime.NewScheme(), + Ctx: context.Background(), + } + corev1.AddToScheme(h.Scheme) + + t.Cleanup(h.Stop) + return h +} + +func (h *Harness) ParseObjects(y string) []*unstructured.Unstructured { + t := h.T + + var objects []*unstructured.Unstructured + + decoder := yamlutil.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(y)), 100) + for { + var rawObj runtime.RawExtension + if err := decoder.Decode(&rawObj); err != nil { + if err != io.EOF { + t.Fatalf("error decoding yaml: %v", err) + } + break + } + + m, _, err := yamlserializer.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, nil) + if err != nil { + t.Fatalf("error decoding yaml: %v", err) + } + + unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(m) + if err != nil { + t.Fatalf("error parsing object: %v", err) + } + unstructuredObj := &unstructured.Unstructured{Object: unstructuredMap} + + objects = append(objects, unstructuredObj) + } + + return objects +} + +func (h *Harness) WithObjects(initObjs ...*unstructured.Unstructured) { + if *kubeconfig == "" { + k8s, err := mockkubeapiserver.NewMockKubeAPIServer(":0") + if err != nil { + h.Fatalf("error building mock kube-apiserver: %v", err) + } + h.k8s = k8s + + // TODO: Discover from scheme? + k8s.Add(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}, "namespaces", meta.RESTScopeRoot) + k8s.Add(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Secret"}, "secrets", meta.RESTScopeNamespace) + k8s.Add(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"}, "configmaps", meta.RESTScopeNamespace) + + addr, err := k8s.StartServing() + if err != nil { + h.Errorf("error starting mock kube-apiserver: %v", err) + } + + h.restConfig = &rest.Config{ + Host: addr.String(), + } + } else { + kubeconfigPath := *kubeconfig + if strings.HasPrefix(kubeconfigPath, "~/") { + homeDir := homedir.HomeDir() + kubeconfigPath = strings.Replace(kubeconfigPath, "~/", homeDir+"/", 1) + } + restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if err != nil { + h.Fatalf("error building kubeconfig for %q: %v", kubeconfigPath, err) + } + + h.restConfig = restConfig + } + + client, err := client.New(h.RESTConfig(), client.Options{}) + if err != nil { + h.Fatalf("error building client: %v", err) + } + for _, obj := range initObjs { + if err := client.Create(h.Ctx, obj); err != nil { + h.Errorf("error creating object %v: %v", obj, err) + } + } + + h.client = client +} + +func (h *Harness) Stop() { + if h.k8s != nil { + if err := h.k8s.Stop(); err != nil { + h.Errorf("error closing mock kube-apiserver: %v", err) + } + } +} + +func (h *Harness) DynamicClient() dynamic.Interface { + dynamicClient, err := dynamic.NewForConfig(h.RESTConfig()) + if err != nil { + h.Fatalf("error building dynamicClient: %v", err) + } + return dynamicClient +} + +func (h *Harness) Client() client.Client { + if h.client == nil { + h.Fatalf("must call Start() before Client()") + } + return h.client +} + +func (h *Harness) RESTConfig() *rest.Config { + if h.restConfig == nil { + h.Fatalf("cannot call RESTConfig before Start") + } + return h.restConfig +} + +func (h *Harness) RESTMapper() *restmapper.DeferredDiscoveryRESTMapper { + if h.restMapper == nil { + // discoveryClient, err := discovery.NewDiscoveryClientForConfig(h.RESTConfig()) + // if err != nil { + // h.Fatalf("error building discovery client: %") + // } + + // TODO: Use memory cache or simplified rest mapper + discoveryClient, err := disk.NewCachedDiscoveryClientForConfig(h.RESTConfig(), "/home/justinsb/tmp/discovery", "", time.Minute) + if err != nil { + h.Fatalf("error building discovery client: %v", err) + } + + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + + h.restMapper = restMapper + } + + return h.restMapper +} diff --git a/pkg/applylib/mocks/mockkubeapiserver/apigrouplist.go b/pkg/applylib/mocks/mockkubeapiserver/apigrouplist.go new file mode 100644 index 0000000000..60d3a90f55 --- /dev/null +++ b/pkg/applylib/mocks/mockkubeapiserver/apigrouplist.go @@ -0,0 +1,76 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mockkubeapiserver + +import ( + "sort" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// apiGroupList is a request for api discovery, such as GET /apis +type apiGroupList struct { + baseRequest +} + +// Run serves the GET /apis endpoint +func (r *apiGroupList) Run(s *MockKubeAPIServer) error { + groupMap := make(map[string]*metav1.APIGroup) + for _, resource := range s.schema.resources { + group := groupMap[resource.Group] + if group == nil { + group = &metav1.APIGroup{Name: resource.Group} + groupMap[resource.Group] = group + } + + foundVersion := false + for _, version := range group.Versions { + if version.Version == resource.Version { + foundVersion = true + } + } + if !foundVersion { + group.Versions = append(group.Versions, metav1.GroupVersionForDiscovery{ + GroupVersion: resource.Group + "/" + resource.Version, + Version: resource.Version, + }) + } + } + + for _, group := range groupMap { + sort.Slice(group.Versions, func(i, j int) bool { + return group.Versions[i].Version < group.Versions[j].Version + }) + } + + var groupKeys []string + for key := range groupMap { + groupKeys = append(groupKeys, key) + } + sort.Strings(groupKeys) + + response := &metav1.APIGroupList{} + response.Kind = "APIGroupList" + response.APIVersion = "v1" + for _, groupKey := range groupKeys { + group := groupMap[groupKey] + // Assume preferred version is newest + group.PreferredVersion = group.Versions[len(group.Versions)-1] + response.Groups = append(response.Groups, *group) + } + return r.writeResponse(response) +} diff --git a/pkg/applylib/mocks/mockkubeapiserver/apiresourcelist.go b/pkg/applylib/mocks/mockkubeapiserver/apiresourcelist.go new file mode 100644 index 0000000000..fc481f81be --- /dev/null +++ b/pkg/applylib/mocks/mockkubeapiserver/apiresourcelist.go @@ -0,0 +1,49 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mockkubeapiserver + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// apiResourceList is a request for api discovery, such as GET /apis/resourcemanager.cnrm.cloud.google.com/v1beta1 +type apiResourceList struct { + baseRequest + + Group string + Version string +} + +// Run serves the http request +func (req *apiResourceList) Run(s *MockKubeAPIServer) error { + gv := schema.GroupVersion{ + Group: req.Group, + Version: req.Version, + } + response := &metav1.APIResourceList{} + response.Kind = "APIResourceList" + response.APIVersion = "v1" + response.GroupVersion = gv.String() + for _, resource := range s.schema.resources { + if resource.Group != req.Group || resource.Version != req.Version { + continue + } + response.APIResources = append(response.APIResources, resource.APIResource) + } + return req.writeResponse(response) +} diff --git a/pkg/applylib/mocks/mockkubeapiserver/apiserver.go b/pkg/applylib/mocks/mockkubeapiserver/apiserver.go new file mode 100644 index 0000000000..25517af7ce --- /dev/null +++ b/pkg/applylib/mocks/mockkubeapiserver/apiserver.go @@ -0,0 +1,324 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mockkubeapiserver + +import ( + "encoding/json" + "fmt" + "net" + "net/http" + "strings" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +func NewMockKubeAPIServer(addr string) (*MockKubeAPIServer, error) { + s := &MockKubeAPIServer{ + objects: make(map[schema.GroupResource]*objectList), + } + if addr == "" { + addr = ":http" + } + + s.httpServer = &http.Server{Addr: addr, Handler: s} + + return s, nil +} + +type MockKubeAPIServer struct { + httpServer *http.Server + listener net.Listener + + schema mockSchema + objects map[schema.GroupResource]*objectList +} + +type mockSchema struct { + resources []mockSchemaResource +} + +type mockSchemaResource struct { + metav1.APIResource +} + +func (s *MockKubeAPIServer) StartServing() (net.Addr, error) { + listener, err := net.Listen("tcp", s.httpServer.Addr) + if err != nil { + return nil, err + } + s.listener = listener + addr := listener.Addr() + go func() { + if err := s.httpServer.Serve(s.listener); err != nil { + if err != http.ErrServerClosed { + klog.Errorf("error serving: %v", err) + } + } + }() + return addr, nil +} + +func (s *MockKubeAPIServer) Stop() error { + return s.httpServer.Close() +} + +func (s *MockKubeAPIServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + klog.Infof("kubeapiserver request: %s %s", r.Method, r.URL) + + path := r.URL.Path + tokens := strings.Split(strings.Trim(path, "/"), "/") + + var req Request + + // matchedPath is bool if we recognized the path, but if we didn't build a req we should send StatusMethodNotAllowed instead of NotFound + var matchedPath bool + + if len(tokens) == 2 { + if tokens[0] == "api" && tokens[1] == "v1" { + matchedPath = true + + switch r.Method { + case http.MethodGet: + req = &apiResourceList{ + Group: "", + Version: "v1", + } + } + } + } + if len(tokens) == 1 { + if tokens[0] == "api" { + matchedPath = true + + switch r.Method { + case http.MethodGet: + req = &apiVersionsRequest{} + } + } + + if tokens[0] == "apis" { + matchedPath = true + switch r.Method { + case http.MethodGet: + req = &apiGroupList{} + } + } + } + + if len(tokens) == 3 { + if tokens[0] == "apis" { + matchedPath = true + switch r.Method { + case http.MethodGet: + req = &apiResourceList{ + Group: tokens[1], + Version: tokens[2], + } + } + } + } + + buildObjectRequest := func(common resourceRequestBase) { + switch r.Method { + case http.MethodGet: + req = &getResource{ + resourceRequestBase: common, + } + case http.MethodPatch: + req = &patchResource{ + resourceRequestBase: common, + } + case http.MethodPut: + req = &putResource{ + resourceRequestBase: common, + } + } + } + + if len(tokens) == 4 { + if tokens[0] == "api" { + buildObjectRequest(resourceRequestBase{ + Group: "", + Version: tokens[1], + Resource: tokens[2], + Name: tokens[3], + }) + matchedPath = true + } + } + if len(tokens) == 6 { + if tokens[0] == "api" && tokens[2] == "namespaces" { + buildObjectRequest(resourceRequestBase{ + Group: "", + Version: tokens[1], + Resource: tokens[4], + Namespace: tokens[3], + Name: tokens[5], + }) + matchedPath = true + } + } + if len(tokens) == 7 { + if tokens[0] == "apis" && tokens[3] == "namespaces" { + buildObjectRequest(resourceRequestBase{ + Group: tokens[1], + Version: tokens[2], + Namespace: tokens[4], + Resource: tokens[5], + Name: tokens[6], + }) + matchedPath = true + } + } + if len(tokens) == 8 { + if tokens[0] == "apis" && tokens[3] == "namespaces" { + buildObjectRequest(resourceRequestBase{ + Group: tokens[1], + Version: tokens[2], + Namespace: tokens[4], + Resource: tokens[5], + Name: tokens[6], + SubResource: tokens[7], + }) + matchedPath = true + } + } + + if req == nil { + if matchedPath { + klog.Warningf("method not allowed for %s %s", r.Method, r.URL) + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + } else { + klog.Warningf("404 for %s %s tokens=%#v", r.Method, r.URL, tokens) + http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) + } + return + } + + req.Init(w, r) + + err := req.Run(s) + if err != nil { + klog.Warningf("internal error for %s %s: %v", r.Method, r.URL, err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + } + +} + +type Request interface { + Run(s *MockKubeAPIServer) error + Init(w http.ResponseWriter, r *http.Request) +} + +// baseRequest is the base for our higher-level http requests +type baseRequest struct { + w http.ResponseWriter + r *http.Request +} + +func (b *baseRequest) Init(w http.ResponseWriter, r *http.Request) { + b.w = w + b.r = r +} + +func (r *baseRequest) writeResponse(obj interface{}) error { + b, err := json.Marshal(obj) + if err != nil { + return fmt.Errorf("error from json.Marshal on %T: %w", obj, err) + } + r.w.Header().Add("Content-Type", "application/json") + r.w.Header().Add("Cache-Control", "no-cache, private") + + if _, err := r.w.Write(b); err != nil { + // Too late to send error response + klog.Warningf("error writing http response: %w", err) + return nil + } + return nil +} + +func (r *baseRequest) writeErrorResponse(statusCode int) error { + klog.Warningf("404 for %s %s", r.r.Method, r.r.URL) + http.Error(r.w, http.StatusText(statusCode), statusCode) + + return nil +} + +// Add registers a type with the schema for the mock kubeapiserver +func (s *MockKubeAPIServer) Add(gvk schema.GroupVersionKind, resource string, scope meta.RESTScope) { + r := mockSchemaResource{ + APIResource: metav1.APIResource{ + Name: resource, + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind, + }, + } + if scope.Name() == meta.RESTScopeNameNamespace { + r.Namespaced = true + } + + s.schema.resources = append(s.schema.resources, r) +} + +// AddObject pre-creates an object +func (s *MockKubeAPIServer) AddObject(obj *unstructured.Unstructured) error { + gv, err := schema.ParseGroupVersion(obj.GetAPIVersion()) + if err != nil { + return fmt.Errorf("cannot parse apiVersion %q: %w", obj.GetAPIVersion(), err) + } + kind := obj.GetKind() + + id := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } + + for _, resource := range s.schema.resources { + if resource.Group != gv.Group || resource.Version != gv.Version { + continue + } + if resource.Kind != kind { + continue + } + + gr := schema.GroupResource{Group: resource.Group, Resource: resource.Name} + objects := s.objects[gr] + if objects == nil { + objects = &objectList{ + GroupResource: gr, + Objects: make(map[types.NamespacedName]*unstructured.Unstructured), + } + s.objects[gr] = objects + } + + objects.Objects[id] = obj + return nil + } + gvk := gv.WithKind(kind) + return fmt.Errorf("object group/version/kind %v not known", gvk) +} + +type objectList struct { + GroupResource schema.GroupResource + Objects map[types.NamespacedName]*unstructured.Unstructured +} diff --git a/pkg/applylib/mocks/mockkubeapiserver/apiversions.go b/pkg/applylib/mocks/mockkubeapiserver/apiversions.go new file mode 100644 index 0000000000..3d3e0d0216 --- /dev/null +++ b/pkg/applylib/mocks/mockkubeapiserver/apiversions.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mockkubeapiserver + +import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + +// apiVersionsRequest is a wrapper around a request for core api version, such as GET /api +type apiVersionsRequest struct { + baseRequest +} + +// Run serves the GET /api endpoint +func (r *apiVersionsRequest) Run(s *MockKubeAPIServer) error { + versions := &metav1.APIVersions{} + versions.Kind = "APIVersions" + versions.Versions = []string{"v1"} + + return r.writeResponse(versions) +} diff --git a/pkg/applylib/mocks/mockkubeapiserver/controllers.go b/pkg/applylib/mocks/mockkubeapiserver/controllers.go new file mode 100644 index 0000000000..b15adef6c5 --- /dev/null +++ b/pkg/applylib/mocks/mockkubeapiserver/controllers.go @@ -0,0 +1,59 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mockkubeapiserver + +import ( + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func (s *MockKubeAPIServer) objectChanged(u *unstructured.Unstructured) { + gvk := u.GroupVersionKind() + + switch gvk.GroupKind() { + case schema.GroupKind{Kind: "Namespace"}: + s.namespaceChanged(u) + } +} + +func (s *MockKubeAPIServer) namespaceChanged(u *unstructured.Unstructured) { + // These changes seem to be done synchronously (similar to a mutating webhook) + labels := u.GetLabels() + name := u.GetName() + if labels["kubernetes.io/metadata.name"] != name { + if labels == nil { + labels = make(map[string]string) + } + labels["kubernetes.io/metadata.name"] = name + u.SetLabels(labels) + } + phase, _, _ := unstructured.NestedFieldNoCopy(u.Object, "status", "phase") + if phase != "Active" { + unstructured.SetNestedField(u.Object, "Active", "status", "phase") + } + found := false + finalizers, _, _ := unstructured.NestedSlice(u.Object, "spec", "finalizers") + for _, finalizer := range finalizers { + if finalizer == "kubernetes" { + found = true + } + } + if !found { + finalizers = append(finalizers, "kubernetes") + unstructured.SetNestedSlice(u.Object, finalizers, "spec", "finalizers") + } +} diff --git a/pkg/applylib/mocks/mockkubeapiserver/getresource.go b/pkg/applylib/mocks/mockkubeapiserver/getresource.go new file mode 100644 index 0000000000..bc73a356d3 --- /dev/null +++ b/pkg/applylib/mocks/mockkubeapiserver/getresource.go @@ -0,0 +1,67 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mockkubeapiserver + +import ( + "encoding/json" + "net/http" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +// resourceRequestBase holds the common field for single-resource requests +type resourceRequestBase struct { + baseRequest + + Group string + Version string + Resource string + Namespace string + Name string + + SubResource string +} + +// getResource is a request to get a single resource +type getResource struct { + resourceRequestBase +} + +// Run serves the http request +func (req *getResource) Run(s *MockKubeAPIServer) error { + gr := schema.GroupResource{Group: req.Group, Resource: req.Resource} + + var object runtime.Object + objects := s.objects[gr] + if objects != nil { + object = objects.Objects[types.NamespacedName{Namespace: req.Namespace, Name: req.Name}] + } + if object == nil { + return req.writeErrorResponse(http.StatusNotFound) + } + + j, err := json.Marshal(object) + if err != nil { + klog.Warningf("object does not marshal: %v", err) + } else { + klog.Infof("returning %v", string(j)) + } + return req.writeResponse(object) +} diff --git a/pkg/applylib/mocks/mockkubeapiserver/patchresource.go b/pkg/applylib/mocks/mockkubeapiserver/patchresource.go new file mode 100644 index 0000000000..d005df93f0 --- /dev/null +++ b/pkg/applylib/mocks/mockkubeapiserver/patchresource.go @@ -0,0 +1,117 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mockkubeapiserver + +import ( + "fmt" + "io/ioutil" + "net/http" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +// patchResource is a request to patch a single resource +type patchResource struct { + resourceRequestBase +} + +// Run serves the http request +func (req *patchResource) Run(s *MockKubeAPIServer) error { + gr := schema.GroupResource{Group: req.Group, Resource: req.Resource} + + id := types.NamespacedName{Namespace: req.Namespace, Name: req.Name} + var existing *unstructured.Unstructured + objects := s.objects[gr] + if objects != nil { + existing = objects.Objects[id] + } + + bodyBytes, err := ioutil.ReadAll(req.r.Body) + if err != nil { + return err + } + + body := &unstructured.Unstructured{} + if err := body.UnmarshalJSON(bodyBytes); err != nil { + return fmt.Errorf("failed to parse payload: %w", err) + } + + // TODO: We need to implement patch properly + klog.Infof("patch request %#v", string(bodyBytes)) + + if existing == nil { + // TODO: Only if server-side-apply + if objects == nil { + objects = &objectList{ + GroupResource: gr, + Objects: make(map[types.NamespacedName]*unstructured.Unstructured), + } + s.objects[gr] = objects + } + + if req.SubResource != "" { + // TODO: Is this correct for server-side-apply? + return req.writeErrorResponse(http.StatusNotFound) + } + + patched := body + objects.Objects[id] = patched + s.objectChanged(patched) + return req.writeResponse(patched) + } + + if req.SubResource == "" { + if err := applyPatch(existing.Object, body.Object); err != nil { + klog.Warningf("error from patch: %v", err) + return err + } + } else { + // TODO: We need to implement put properly + return fmt.Errorf("unknown subresource %q", req.SubResource) + } + objects.Objects[id] = existing + s.objectChanged(existing) + return req.writeResponse(existing) +} + +func applyPatch(existing, patch map[string]interface{}) error { + for k, patchValue := range patch { + existingValue := existing[k] + switch patchValue := patchValue.(type) { + case string, int64: + existing[k] = patchValue + case map[string]interface{}: + if existingValue == nil { + existing[k] = patchValue + } else { + existingMap, ok := existingValue.(map[string]interface{}) + if !ok { + return fmt.Errorf("unexpected type mismatch, expected map got %T", existingValue) + } + if err := applyPatch(existingMap, patchValue); err != nil { + return err + } + } + default: + return fmt.Errorf("type %T not handled in patch", patchValue) + } + } + return nil +} diff --git a/pkg/applylib/mocks/mockkubeapiserver/putresource.go b/pkg/applylib/mocks/mockkubeapiserver/putresource.go new file mode 100644 index 0000000000..df88e9c1fc --- /dev/null +++ b/pkg/applylib/mocks/mockkubeapiserver/putresource.go @@ -0,0 +1,82 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mockkubeapiserver + +import ( + "fmt" + "io/ioutil" + "net/http" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +// putResource is a request to get a single resource +type putResource struct { + resourceRequestBase +} + +// Run serves the http request +func (req *putResource) Run(s *MockKubeAPIServer) error { + gr := schema.GroupResource{Group: req.Group, Resource: req.Resource} + + id := types.NamespacedName{Namespace: req.Namespace, Name: req.Name} + + var existing runtime.Object + objects := s.objects[gr] + if objects != nil { + existing = objects.Objects[id] + } + if existing == nil { + return req.writeErrorResponse(http.StatusNotFound) + } + + bodyBytes, err := ioutil.ReadAll(req.r.Body) + if err != nil { + return err + } + + klog.Infof("put request %#v", string(bodyBytes)) + + body := &unstructured.Unstructured{} + if err := body.UnmarshalJSON(bodyBytes); err != nil { + return fmt.Errorf("failed to parse payload: %w", err) + } + + var updated *unstructured.Unstructured + + if req.SubResource == "" { + updated = body + } else if req.SubResource == "status" { + updated = existing.DeepCopyObject().(*unstructured.Unstructured) + newStatus := body.Object["status"] + if newStatus == nil { + // TODO: This might be allowed? + return fmt.Errorf("status not specified on status subresource update") + } + updated.Object["status"] = newStatus + } else { + // TODO: We need to implement put properly + return fmt.Errorf("unknown subresource %q", req.SubResource) + } + objects.Objects[id] = updated + s.objectChanged(updated) + return req.writeResponse(updated) +}