Merge pull request #14030 from justinsb/applylib

Introduce library for applying objects
This commit is contained in:
Kubernetes Prow Robot 2022-08-20 07:21:35 -07:00 committed by GitHub
commit 3d84f3ca87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1693 additions and 80 deletions

View File

@ -18,19 +18,13 @@ package channels
import (
"context"
"encoding/json"
"fmt"
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/klog/v2"
"k8s.io/kops/pkg/applylib/applyset"
"k8s.io/kops/pkg/kubemanifest"
"k8s.io/kops/upup/pkg/fi"
)
type ClientApplier struct {
@ -45,89 +39,51 @@ func (p *ClientApplier) Apply(ctx context.Context, manifest []byte) error {
return fmt.Errorf("failed to parse objects: %w", err)
}
objectsByGVK := make(map[schema.GroupVersionKind][]*kubemanifest.Object)
for _, object := range objects {
key := object.GetNamespace() + "/" + object.GetName()
gv, err := schema.ParseGroupVersion(object.APIVersion())
if err != nil || gv.Version == "" {
return fmt.Errorf("failed to parse apiVersion %q in object %s", object.APIVersion(), key)
}
kind := object.Kind()
if kind == "" {
return fmt.Errorf("failed to find kind in object %s", key)
// TODO: Cache applyset for more efficient applying
patchOptions := metav1.PatchOptions{
FieldManager: "kops",
}
gvk := gv.WithKind(kind)
objectsByGVK[gvk] = append(objectsByGVK[gvk], object)
}
// We force to overcome errors like: Apply failed with 1 conflict: conflict with "kubectl-client-side-apply" using apps/v1: .spec.template.spec.containers[name="foo"].image
// TODO: How to handle this better? In a controller we don't have a choice and have to force eventually.
// But we could do something like try first without forcing, log the conflict if there is one, and then force.
// This would mean that if there was a loop we could log/detect it.
// We could even do things like back-off on the force apply.
force := true
patchOptions.Force = &force
var applyErrors error
for gvk := range objectsByGVK {
if err := p.applyObjectsOfKind(ctx, gvk, objectsByGVK[gvk]); err != nil {
applyErrors = multierr.Append(applyErrors, fmt.Errorf("failed to apply objects of kind %s: %w", gvk, err))
}
}
return applyErrors
}
func (p *ClientApplier) applyObjectsOfKind(ctx context.Context, gvk schema.GroupVersionKind, expectedObjects []*kubemanifest.Object) error {
klog.V(2).Infof("applying objects of kind: %v", gvk)
restMapping, err := p.RESTMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
s, err := applyset.New(applyset.Options{
RESTMapper: p.RESTMapper,
Client: p.Client,
PatchOptions: patchOptions,
})
if err != nil {
return fmt.Errorf("unable to find resource for %s: %w", gvk, err)
}
if err := p.applyObjects(ctx, restMapping, expectedObjects); err != nil {
return err
}
return nil
}
func (p *ClientApplier) applyObjects(ctx context.Context, restMapping *meta.RESTMapping, expectedObjects []*kubemanifest.Object) error {
var merr error
for _, expectedObject := range expectedObjects {
err := p.patchObject(ctx, restMapping, expectedObject)
merr = multierr.Append(merr, err)
}
return merr
}
func (p *ClientApplier) patchObject(ctx context.Context, restMapping *meta.RESTMapping, expectedObject *kubemanifest.Object) error {
gvr := restMapping.Resource
name := expectedObject.GetName()
namespace := expectedObject.GetNamespace()
key := namespace + "/" + name
var resource dynamic.ResourceInterface
if restMapping.Scope.Name() == meta.RESTScopeNameNamespace {
if namespace == "" {
return fmt.Errorf("namespace not set for namespace-scoped object %q", key)
}
resource = p.Client.Resource(gvr).Namespace(namespace)
} else {
if namespace != "" {
return fmt.Errorf("namespace was set for cluster-scoped object %q", key)
}
resource = p.Client.Resource(gvr)
}
obj := expectedObject.ToUnstructured()
jsonData, err := json.Marshal(obj)
if err != nil {
return fmt.Errorf("failed to marsal %q into json: %w", obj.GetName(), err)
}
{
_, err := resource.Patch(ctx, obj.GetName(), types.ApplyPatchType, jsonData, v1.PatchOptions{FieldManager: "kops", Force: fi.Bool(true)})
if err != nil {
return fmt.Errorf("failed to patch object %q: %w", obj.GetName(), err)
}
}
var applyableObjects []applyset.ApplyableObject
for _, object := range objects {
applyableObjects = append(applyableObjects, object)
}
if err := s.SetDesiredObjects(applyableObjects); err != nil {
return err
}
results, err := s.ApplyOnce(ctx)
if err != nil {
return fmt.Errorf("failed to apply objects: %w", err)
}
// TODO: Implement pruning
if !results.AllApplied() {
return fmt.Errorf("not all objects were applied")
}
// TODO: Check object health status
if !results.AllHealthy() {
return fmt.Errorf("not all objects were healthy")
}
return nil
}

View File

@ -0,0 +1,163 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applyset
import (
"context"
"encoding/json"
"fmt"
"sync"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
)
// ApplySet is a set of objects that we want to apply to the cluster.
//
// An ApplySet has a few cases which it tries to optimize for:
// * We can change the objects we're applying
// * We want to watch the objects we're applying / be notified of changes
// * We want to know when the objects we apply are "healthy"
// * We expose a "try once" method to better support running from a controller.
//
// TODO: Pluggable health functions.
// TODO: Pruning
type ApplySet struct {
// client is the dynamic kubernetes client used to apply objects to the k8s cluster.
client dynamic.Interface
// restMapper is used to map object kind to resources, and to know if objects are cluster-scoped.
restMapper meta.RESTMapper
// patchOptions holds the options used when applying, in particular the fieldManager
patchOptions metav1.PatchOptions
// mutex guards trackers
mutex sync.Mutex
// trackers is a (mutable) pointer to the (immutable) objectTrackerList, containing a list of objects we are applying.
trackers *objectTrackerList
}
// Options holds the parameters for building an ApplySet.
type Options struct {
// Client is the dynamic kubernetes client used to apply objects to the k8s cluster.
Client dynamic.Interface
// RESTMapper is used to map object kind to resources, and to know if objects are cluster-scoped.
RESTMapper meta.RESTMapper
// PatchOptions holds the options used when applying, in particular the fieldManager
PatchOptions metav1.PatchOptions
}
// New constructs a new ApplySet
func New(options Options) (*ApplySet, error) {
a := &ApplySet{
client: options.Client,
restMapper: options.RESTMapper,
patchOptions: options.PatchOptions,
}
a.trackers = &objectTrackerList{}
return a, nil
}
// SetDesiredObjects is used to replace the desired state of all the objects.
// Any objects not specified are removed from the "desired" set.
func (a *ApplySet) SetDesiredObjects(objects []ApplyableObject) error {
a.mutex.Lock()
defer a.mutex.Unlock()
newTrackers := a.trackers.setDesiredObjects(objects)
a.trackers = newTrackers
return nil
}
// ApplyOnce will make one attempt to apply all objects and observe their health.
// It does not wait for the objects to become healthy, but will report their health.
//
// TODO: Limit the amount of time this takes, particularly if we have thousands of objects.
//
// We don't _have_ to try to apply all objects if it is taking too long.
//
// TODO: We re-apply every object every iteration; we should be able to do better.
func (a *ApplySet) ApplyOnce(ctx context.Context) (*ApplyResults, error) {
// snapshot the state
a.mutex.Lock()
trackers := a.trackers
a.mutex.Unlock()
results := &ApplyResults{total: len(trackers.items)}
for i := range trackers.items {
tracker := &trackers.items[i]
obj := tracker.desired
name := obj.GetName()
ns := obj.GetNamespace()
gvk := obj.GroupVersionKind()
nn := types.NamespacedName{Namespace: ns, Name: name}
restMapping, err := a.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
results.applyError(gvk, nn, fmt.Errorf("error getting rest mapping for %v: %w", gvk, err))
continue
}
gvr := restMapping.Resource
var dynamicResource dynamic.ResourceInterface
switch restMapping.Scope.Name() {
case meta.RESTScopeNameNamespace:
if ns == "" {
// TODO: Differentiate between server-fixable vs client-fixable errors?
results.applyError(gvk, nn, fmt.Errorf("namespace was not provided for namespace-scoped object %v", gvk))
continue
}
dynamicResource = a.client.Resource(gvr).Namespace(ns)
case meta.RESTScopeNameRoot:
if ns != "" {
// TODO: Differentiate between server-fixable vs client-fixable errors?
results.applyError(gvk, nn, fmt.Errorf("namespace %q was provided for cluster-scoped object %v", obj.GetNamespace(), gvk))
continue
}
dynamicResource = a.client.Resource(gvr)
default:
// Internal error ... this is panic-level
return nil, fmt.Errorf("unknown scope for gvk %s: %q", gvk, restMapping.Scope.Name())
}
j, err := json.Marshal(obj)
if err != nil {
// TODO: Differentiate between server-fixable vs client-fixable errors?
results.applyError(gvk, nn, fmt.Errorf("failed to marshal object to JSON: %w", err))
continue
}
lastApplied, err := dynamicResource.Patch(ctx, name, types.ApplyPatchType, j, a.patchOptions)
if err != nil {
results.applyError(gvk, nn, fmt.Errorf("error from apply: %w", err))
continue
}
tracker.lastApplied = lastApplied
results.applySuccess(gvk, nn)
tracker.isHealthy = isHealthy(lastApplied)
results.reportHealth(gvk, nn, tracker.isHealthy)
}
return results, nil
}

View File

@ -0,0 +1,128 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applyset
import (
"path/filepath"
"strings"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kops/pkg/applylib/mocks"
"k8s.io/kops/pkg/testutils/golden"
"sigs.k8s.io/yaml"
)
func TestApplySet(t *testing.T) {
h := mocks.NewHarness(t)
existing := ``
apply := `
apiVersion: v1
kind: Namespace
metadata:
name: test-applyset
---
apiVersion: v1
kind: ConfigMap
metadata:
name: foo
namespace: test-applyset
data:
foo: bar
`
h.WithObjects(h.ParseObjects(existing)...)
applyObjects := h.ParseObjects(apply)
patchOptions := metav1.PatchOptions{
FieldManager: "kops",
}
force := true
patchOptions.Force = &force
s, err := New(Options{
RESTMapper: h.RESTMapper(),
Client: h.DynamicClient(),
PatchOptions: patchOptions,
})
if err != nil {
h.Fatalf("error building applyset object: %v", err)
}
var applyableObjects []ApplyableObject
for _, object := range applyObjects {
applyableObjects = append(applyableObjects, object)
}
if err := s.SetDesiredObjects(applyableObjects); err != nil {
h.Fatalf("failed to set desired objects: %v", err)
}
results, err := s.ApplyOnce(h.Ctx)
if err != nil {
h.Fatalf("failed to apply objects: %v", err)
}
// TODO: Implement pruning
if !results.AllApplied() {
h.Fatalf("not all objects were applied")
}
// TODO: Check object health status
if !results.AllHealthy() {
h.Fatalf("not all objects were healthy")
}
var actual []string
for _, object := range applyObjects {
id := types.NamespacedName{
Namespace: object.GetNamespace(),
Name: object.GetName(),
}
u := &unstructured.Unstructured{}
u.SetAPIVersion(object.GetAPIVersion())
u.SetKind(object.GetKind())
if err := h.Client().Get(h.Ctx, id, u); err != nil {
h.Fatalf("failed to get object %v: %v", id, err)
}
metadata := u.Object["metadata"].(map[string]interface{})
delete(metadata, "creationTimestamp")
delete(metadata, "managedFields")
delete(metadata, "resourceVersion")
delete(metadata, "uid")
y, err := yaml.Marshal(u)
if err != nil {
h.Fatalf("failed to marshal object %v: %v", id, err)
}
actual = append(actual, string(y))
}
testDir := filepath.Join("testdata", strings.ToLower(t.Name()))
golden.AssertMatchesFile(t, strings.Join(actual, "\n---\n"), filepath.Join(testDir, "expected.yaml"))
}

View File

@ -0,0 +1,89 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applyset
import (
"encoding/json"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
)
// isHealthy reports whether the object should be considered "healthy"
// TODO: Replace with kstatus library
func isHealthy(u *unstructured.Unstructured) bool {
ready := true
statusConditions, found, err := unstructured.NestedFieldNoCopy(u.Object, "status", "conditions")
if err != nil || !found {
klog.Infof("status conditions not found for %s", u.GroupVersionKind())
return true
}
statusConditionsList, ok := statusConditions.([]interface{})
if !ok {
klog.Warningf("expected status.conditions to be list, got %T", statusConditions)
return true
}
for i := range statusConditionsList {
condition := statusConditionsList[i]
conditionMap, ok := condition.(map[string]interface{})
if !ok {
klog.Warningf("expected status.conditions[%d] to be map, got %T", i, condition)
return true
}
conditionType := ""
conditionStatus := ""
for k, v := range conditionMap {
switch k {
case "type":
s, ok := v.(string)
if !ok {
klog.Warningf("expected status.conditions[].type to be string, got %T", v)
} else {
conditionType = s
}
case "status":
s, ok := v.(string)
if !ok {
klog.Warningf("expected status.conditions[].status to be string, got %T", v)
} else {
conditionStatus = s
}
}
}
// TODO: Check conditionType?
switch conditionStatus {
case "True":
// ready
case "False":
j, _ := json.Marshal(condition)
klog.Infof("status.conditions indicates object is not ready: %v", string(j))
ready = false
case "":
klog.Warningf("ignoring status.conditions[] type %q with unknown status %q", conditionType, conditionStatus)
}
}
klog.Infof("isHealthy %s %s/%s => %v", u.GroupVersionKind(), u.GetNamespace(), u.GetName(), ready)
return ready
}

View File

@ -0,0 +1,37 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applyset
import (
"encoding/json"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// ApplyableObject is implemented by objects that can be applied to the cluster.
// We don't need much, so this might allow for more efficient implementations in future.
type ApplyableObject interface {
// GroupVersionKind returns the GroupVersionKind structure describing the type of the object
GroupVersionKind() schema.GroupVersionKind
// GetNamespace returns the namespace of the object
GetNamespace() string
// GetName returns the name of the object
GetName() string
// The object should implement json marshalling
json.Marshaler
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applyset
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
// ApplyResults contains the results of an Apply operation.
type ApplyResults struct {
total int
applySuccessCount int
applyFailCount int
healthyCount int
unhealthyCount int
}
// AllApplied is true if the desired state has been successfully applied for all objects.
// Note: you likely also want to check AllHealthy, if you want to be sure the objects are "ready".
func (r *ApplyResults) AllApplied() bool {
r.checkInvariants()
return r.applyFailCount == 0
}
// AllHealthy is true if all the objects have been applied and have converged to a "ready" state.
// Note that this is only meaningful if AllApplied is true.
func (r *ApplyResults) AllHealthy() bool {
r.checkInvariants()
return r.unhealthyCount == 0
}
// checkInvariants is an internal function that warns if the object doesn't match the expected invariants.
func (r *ApplyResults) checkInvariants() {
if r.total != (r.applySuccessCount + r.applyFailCount) {
klog.Warningf("consistency error (apply counts): %#v", r)
} else if r.total != (r.healthyCount + r.unhealthyCount) {
// This "invariant" only holds when all objects could be applied
klog.Warningf("consistency error (healthy counts): %#v", r)
}
}
// applyError records that the apply of an object failed with an error.
func (r *ApplyResults) applyError(gvk schema.GroupVersionKind, nn types.NamespacedName, err error) {
r.applyFailCount++
klog.Warningf("error from apply on %s %s: %v", gvk, nn, err)
}
// applySuccess records that an object was applied and this succeeded.
func (r *ApplyResults) applySuccess(gvk schema.GroupVersionKind, nn types.NamespacedName) {
r.applySuccessCount++
}
// reportHealth records the health of an object.
func (r *ApplyResults) reportHealth(gvk schema.GroupVersionKind, nn types.NamespacedName, isHealthy bool) {
if isHealthy {
r.healthyCount++
} else {
r.unhealthyCount++
}
}

View File

@ -0,0 +1,20 @@
apiVersion: v1
kind: Namespace
metadata:
labels:
kubernetes.io/metadata.name: test-applyset
name: test-applyset
spec:
finalizers:
- kubernetes
status:
phase: Active
---
apiVersion: v1
data:
foo: bar
kind: ConfigMap
metadata:
name: foo
namespace: test-applyset

View File

@ -0,0 +1,104 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applyset
import (
"reflect"
"k8s.io/apimachinery/pkg/runtime"
)
// objectTrackerList is a list of objectTrackers, containing the state of the objects we are trying to apply.
// objectTrackerList is immutable (though objectTracker is mutable); we copy-on-write when the list changes.
// TODO: Given objectTracker is mutable, should we just make objectTrackerList mutable?
type objectTrackerList struct {
items []objectTracker
}
// objectTracker tracks the state for a single object
type objectTracker struct {
desired ApplyableObject
lastApplied runtime.Object
desiredIsApplied bool
isHealthy bool
}
// objectKey is the key used in maps; we consider objects with the same GVKNN the same.
type objectKey struct {
Group string
Version string
Kind string
Namespace string
Name string
}
// computeKey returns the unique key for the object.
func computeKey(u ApplyableObject) objectKey {
gvk := u.GroupVersionKind()
return objectKey{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
Namespace: u.GetNamespace(),
Name: u.GetName(),
}
}
// setDesiredObjects completely replaces the set of objects we are interested in.
// We aim to reuse the current state where it carries over.
// Because objectTrackerList is immutable, we copy-on-write to a new objectTrackerList and return it.
func (l *objectTrackerList) setDesiredObjects(objects []ApplyableObject) *objectTrackerList {
existingTrackers := make(map[objectKey]*objectTracker)
for i := range l.items {
tracker := &l.items[i]
key := computeKey(tracker.desired)
existingTrackers[key] = tracker
}
newList := &objectTrackerList{}
for _, obj := range objects {
key := computeKey(obj)
// TODO: Detect duplicate keys?
existingTracker := existingTrackers[key]
if existingTracker == nil {
newList.items = append(newList.items, objectTracker{
desired: obj,
lastApplied: nil,
desiredIsApplied: false,
isHealthy: false,
})
} else if reflect.DeepEqual(existingTracker.desired, obj) {
newList.items = append(newList.items, objectTracker{
desired: obj,
lastApplied: existingTracker.lastApplied,
desiredIsApplied: existingTracker.desiredIsApplied,
isHealthy: existingTracker.isHealthy,
})
} else {
newList.items = append(newList.items, objectTracker{
desired: obj,
lastApplied: existingTracker.lastApplied,
desiredIsApplied: false,
isHealthy: existingTracker.isHealthy,
})
}
}
return newList
}

21
pkg/applylib/doc.go Normal file
View File

@ -0,0 +1,21 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applylib
// Package applylib implements a helper library for applying a set of objects to a cluster.
// This functionality is needed across multiple projects, and we are pursuing a "copy-and-paste"
// reuse strategy while we decide what functionality is needed and where this library should live.

View File

@ -0,0 +1,200 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mocks
import (
"bytes"
"context"
"flag"
"io"
"strings"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
yamlserializer "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/discovery/cached/disk"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/kops/pkg/applylib/mocks/mockkubeapiserver"
"sigs.k8s.io/controller-runtime/pkg/client"
)
var kubeconfig = flag.String("kubeconfig", "", "set to use a real kube-apiserver")
type Harness struct {
*testing.T
k8s *mockkubeapiserver.MockKubeAPIServer
restConfig *rest.Config
restMapper *restmapper.DeferredDiscoveryRESTMapper
Scheme *runtime.Scheme
Ctx context.Context
client client.Client
}
func NewHarness(t *testing.T) *Harness {
h := &Harness{
T: t,
Scheme: runtime.NewScheme(),
Ctx: context.Background(),
}
corev1.AddToScheme(h.Scheme)
t.Cleanup(h.Stop)
return h
}
func (h *Harness) ParseObjects(y string) []*unstructured.Unstructured {
t := h.T
var objects []*unstructured.Unstructured
decoder := yamlutil.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(y)), 100)
for {
var rawObj runtime.RawExtension
if err := decoder.Decode(&rawObj); err != nil {
if err != io.EOF {
t.Fatalf("error decoding yaml: %v", err)
}
break
}
m, _, err := yamlserializer.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, nil)
if err != nil {
t.Fatalf("error decoding yaml: %v", err)
}
unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(m)
if err != nil {
t.Fatalf("error parsing object: %v", err)
}
unstructuredObj := &unstructured.Unstructured{Object: unstructuredMap}
objects = append(objects, unstructuredObj)
}
return objects
}
func (h *Harness) WithObjects(initObjs ...*unstructured.Unstructured) {
if *kubeconfig == "" {
k8s, err := mockkubeapiserver.NewMockKubeAPIServer(":0")
if err != nil {
h.Fatalf("error building mock kube-apiserver: %v", err)
}
h.k8s = k8s
// TODO: Discover from scheme?
k8s.Add(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}, "namespaces", meta.RESTScopeRoot)
k8s.Add(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Secret"}, "secrets", meta.RESTScopeNamespace)
k8s.Add(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"}, "configmaps", meta.RESTScopeNamespace)
addr, err := k8s.StartServing()
if err != nil {
h.Errorf("error starting mock kube-apiserver: %v", err)
}
h.restConfig = &rest.Config{
Host: addr.String(),
}
} else {
kubeconfigPath := *kubeconfig
if strings.HasPrefix(kubeconfigPath, "~/") {
homeDir := homedir.HomeDir()
kubeconfigPath = strings.Replace(kubeconfigPath, "~/", homeDir+"/", 1)
}
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
h.Fatalf("error building kubeconfig for %q: %v", kubeconfigPath, err)
}
h.restConfig = restConfig
}
client, err := client.New(h.RESTConfig(), client.Options{})
if err != nil {
h.Fatalf("error building client: %v", err)
}
for _, obj := range initObjs {
if err := client.Create(h.Ctx, obj); err != nil {
h.Errorf("error creating object %v: %v", obj, err)
}
}
h.client = client
}
func (h *Harness) Stop() {
if h.k8s != nil {
if err := h.k8s.Stop(); err != nil {
h.Errorf("error closing mock kube-apiserver: %v", err)
}
}
}
func (h *Harness) DynamicClient() dynamic.Interface {
dynamicClient, err := dynamic.NewForConfig(h.RESTConfig())
if err != nil {
h.Fatalf("error building dynamicClient: %v", err)
}
return dynamicClient
}
func (h *Harness) Client() client.Client {
if h.client == nil {
h.Fatalf("must call Start() before Client()")
}
return h.client
}
func (h *Harness) RESTConfig() *rest.Config {
if h.restConfig == nil {
h.Fatalf("cannot call RESTConfig before Start")
}
return h.restConfig
}
func (h *Harness) RESTMapper() *restmapper.DeferredDiscoveryRESTMapper {
if h.restMapper == nil {
// discoveryClient, err := discovery.NewDiscoveryClientForConfig(h.RESTConfig())
// if err != nil {
// h.Fatalf("error building discovery client: %")
// }
// TODO: Use memory cache or simplified rest mapper
discoveryClient, err := disk.NewCachedDiscoveryClientForConfig(h.RESTConfig(), "/home/justinsb/tmp/discovery", "", time.Minute)
if err != nil {
h.Fatalf("error building discovery client: %v", err)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
h.restMapper = restMapper
}
return h.restMapper
}

View File

@ -0,0 +1,76 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mockkubeapiserver
import (
"sort"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// apiGroupList is a request for api discovery, such as GET /apis
type apiGroupList struct {
baseRequest
}
// Run serves the GET /apis endpoint
func (r *apiGroupList) Run(s *MockKubeAPIServer) error {
groupMap := make(map[string]*metav1.APIGroup)
for _, resource := range s.schema.resources {
group := groupMap[resource.Group]
if group == nil {
group = &metav1.APIGroup{Name: resource.Group}
groupMap[resource.Group] = group
}
foundVersion := false
for _, version := range group.Versions {
if version.Version == resource.Version {
foundVersion = true
}
}
if !foundVersion {
group.Versions = append(group.Versions, metav1.GroupVersionForDiscovery{
GroupVersion: resource.Group + "/" + resource.Version,
Version: resource.Version,
})
}
}
for _, group := range groupMap {
sort.Slice(group.Versions, func(i, j int) bool {
return group.Versions[i].Version < group.Versions[j].Version
})
}
var groupKeys []string
for key := range groupMap {
groupKeys = append(groupKeys, key)
}
sort.Strings(groupKeys)
response := &metav1.APIGroupList{}
response.Kind = "APIGroupList"
response.APIVersion = "v1"
for _, groupKey := range groupKeys {
group := groupMap[groupKey]
// Assume preferred version is newest
group.PreferredVersion = group.Versions[len(group.Versions)-1]
response.Groups = append(response.Groups, *group)
}
return r.writeResponse(response)
}

View File

@ -0,0 +1,49 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mockkubeapiserver
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// apiResourceList is a request for api discovery, such as GET /apis/resourcemanager.cnrm.cloud.google.com/v1beta1
type apiResourceList struct {
baseRequest
Group string
Version string
}
// Run serves the http request
func (req *apiResourceList) Run(s *MockKubeAPIServer) error {
gv := schema.GroupVersion{
Group: req.Group,
Version: req.Version,
}
response := &metav1.APIResourceList{}
response.Kind = "APIResourceList"
response.APIVersion = "v1"
response.GroupVersion = gv.String()
for _, resource := range s.schema.resources {
if resource.Group != req.Group || resource.Version != req.Version {
continue
}
response.APIResources = append(response.APIResources, resource.APIResource)
}
return req.writeResponse(response)
}

View File

@ -0,0 +1,324 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mockkubeapiserver
import (
"encoding/json"
"fmt"
"net"
"net/http"
"strings"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
func NewMockKubeAPIServer(addr string) (*MockKubeAPIServer, error) {
s := &MockKubeAPIServer{
objects: make(map[schema.GroupResource]*objectList),
}
if addr == "" {
addr = ":http"
}
s.httpServer = &http.Server{Addr: addr, Handler: s}
return s, nil
}
type MockKubeAPIServer struct {
httpServer *http.Server
listener net.Listener
schema mockSchema
objects map[schema.GroupResource]*objectList
}
type mockSchema struct {
resources []mockSchemaResource
}
type mockSchemaResource struct {
metav1.APIResource
}
func (s *MockKubeAPIServer) StartServing() (net.Addr, error) {
listener, err := net.Listen("tcp", s.httpServer.Addr)
if err != nil {
return nil, err
}
s.listener = listener
addr := listener.Addr()
go func() {
if err := s.httpServer.Serve(s.listener); err != nil {
if err != http.ErrServerClosed {
klog.Errorf("error serving: %v", err)
}
}
}()
return addr, nil
}
func (s *MockKubeAPIServer) Stop() error {
return s.httpServer.Close()
}
func (s *MockKubeAPIServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
klog.Infof("kubeapiserver request: %s %s", r.Method, r.URL)
path := r.URL.Path
tokens := strings.Split(strings.Trim(path, "/"), "/")
var req Request
// matchedPath is bool if we recognized the path, but if we didn't build a req we should send StatusMethodNotAllowed instead of NotFound
var matchedPath bool
if len(tokens) == 2 {
if tokens[0] == "api" && tokens[1] == "v1" {
matchedPath = true
switch r.Method {
case http.MethodGet:
req = &apiResourceList{
Group: "",
Version: "v1",
}
}
}
}
if len(tokens) == 1 {
if tokens[0] == "api" {
matchedPath = true
switch r.Method {
case http.MethodGet:
req = &apiVersionsRequest{}
}
}
if tokens[0] == "apis" {
matchedPath = true
switch r.Method {
case http.MethodGet:
req = &apiGroupList{}
}
}
}
if len(tokens) == 3 {
if tokens[0] == "apis" {
matchedPath = true
switch r.Method {
case http.MethodGet:
req = &apiResourceList{
Group: tokens[1],
Version: tokens[2],
}
}
}
}
buildObjectRequest := func(common resourceRequestBase) {
switch r.Method {
case http.MethodGet:
req = &getResource{
resourceRequestBase: common,
}
case http.MethodPatch:
req = &patchResource{
resourceRequestBase: common,
}
case http.MethodPut:
req = &putResource{
resourceRequestBase: common,
}
}
}
if len(tokens) == 4 {
if tokens[0] == "api" {
buildObjectRequest(resourceRequestBase{
Group: "",
Version: tokens[1],
Resource: tokens[2],
Name: tokens[3],
})
matchedPath = true
}
}
if len(tokens) == 6 {
if tokens[0] == "api" && tokens[2] == "namespaces" {
buildObjectRequest(resourceRequestBase{
Group: "",
Version: tokens[1],
Resource: tokens[4],
Namespace: tokens[3],
Name: tokens[5],
})
matchedPath = true
}
}
if len(tokens) == 7 {
if tokens[0] == "apis" && tokens[3] == "namespaces" {
buildObjectRequest(resourceRequestBase{
Group: tokens[1],
Version: tokens[2],
Namespace: tokens[4],
Resource: tokens[5],
Name: tokens[6],
})
matchedPath = true
}
}
if len(tokens) == 8 {
if tokens[0] == "apis" && tokens[3] == "namespaces" {
buildObjectRequest(resourceRequestBase{
Group: tokens[1],
Version: tokens[2],
Namespace: tokens[4],
Resource: tokens[5],
Name: tokens[6],
SubResource: tokens[7],
})
matchedPath = true
}
}
if req == nil {
if matchedPath {
klog.Warningf("method not allowed for %s %s", r.Method, r.URL)
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
} else {
klog.Warningf("404 for %s %s tokens=%#v", r.Method, r.URL, tokens)
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
return
}
req.Init(w, r)
err := req.Run(s)
if err != nil {
klog.Warningf("internal error for %s %s: %v", r.Method, r.URL, err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
}
}
type Request interface {
Run(s *MockKubeAPIServer) error
Init(w http.ResponseWriter, r *http.Request)
}
// baseRequest is the base for our higher-level http requests
type baseRequest struct {
w http.ResponseWriter
r *http.Request
}
func (b *baseRequest) Init(w http.ResponseWriter, r *http.Request) {
b.w = w
b.r = r
}
func (r *baseRequest) writeResponse(obj interface{}) error {
b, err := json.Marshal(obj)
if err != nil {
return fmt.Errorf("error from json.Marshal on %T: %w", obj, err)
}
r.w.Header().Add("Content-Type", "application/json")
r.w.Header().Add("Cache-Control", "no-cache, private")
if _, err := r.w.Write(b); err != nil {
// Too late to send error response
klog.Warningf("error writing http response: %w", err)
return nil
}
return nil
}
func (r *baseRequest) writeErrorResponse(statusCode int) error {
klog.Warningf("404 for %s %s", r.r.Method, r.r.URL)
http.Error(r.w, http.StatusText(statusCode), statusCode)
return nil
}
// Add registers a type with the schema for the mock kubeapiserver
func (s *MockKubeAPIServer) Add(gvk schema.GroupVersionKind, resource string, scope meta.RESTScope) {
r := mockSchemaResource{
APIResource: metav1.APIResource{
Name: resource,
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
},
}
if scope.Name() == meta.RESTScopeNameNamespace {
r.Namespaced = true
}
s.schema.resources = append(s.schema.resources, r)
}
// AddObject pre-creates an object
func (s *MockKubeAPIServer) AddObject(obj *unstructured.Unstructured) error {
gv, err := schema.ParseGroupVersion(obj.GetAPIVersion())
if err != nil {
return fmt.Errorf("cannot parse apiVersion %q: %w", obj.GetAPIVersion(), err)
}
kind := obj.GetKind()
id := types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
for _, resource := range s.schema.resources {
if resource.Group != gv.Group || resource.Version != gv.Version {
continue
}
if resource.Kind != kind {
continue
}
gr := schema.GroupResource{Group: resource.Group, Resource: resource.Name}
objects := s.objects[gr]
if objects == nil {
objects = &objectList{
GroupResource: gr,
Objects: make(map[types.NamespacedName]*unstructured.Unstructured),
}
s.objects[gr] = objects
}
objects.Objects[id] = obj
return nil
}
gvk := gv.WithKind(kind)
return fmt.Errorf("object group/version/kind %v not known", gvk)
}
type objectList struct {
GroupResource schema.GroupResource
Objects map[types.NamespacedName]*unstructured.Unstructured
}

View File

@ -0,0 +1,33 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mockkubeapiserver
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// apiVersionsRequest is a wrapper around a request for core api version, such as GET /api
type apiVersionsRequest struct {
baseRequest
}
// Run serves the GET /api endpoint
func (r *apiVersionsRequest) Run(s *MockKubeAPIServer) error {
versions := &metav1.APIVersions{}
versions.Kind = "APIVersions"
versions.Versions = []string{"v1"}
return r.writeResponse(versions)
}

View File

@ -0,0 +1,59 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mockkubeapiserver
import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func (s *MockKubeAPIServer) objectChanged(u *unstructured.Unstructured) {
gvk := u.GroupVersionKind()
switch gvk.GroupKind() {
case schema.GroupKind{Kind: "Namespace"}:
s.namespaceChanged(u)
}
}
func (s *MockKubeAPIServer) namespaceChanged(u *unstructured.Unstructured) {
// These changes seem to be done synchronously (similar to a mutating webhook)
labels := u.GetLabels()
name := u.GetName()
if labels["kubernetes.io/metadata.name"] != name {
if labels == nil {
labels = make(map[string]string)
}
labels["kubernetes.io/metadata.name"] = name
u.SetLabels(labels)
}
phase, _, _ := unstructured.NestedFieldNoCopy(u.Object, "status", "phase")
if phase != "Active" {
unstructured.SetNestedField(u.Object, "Active", "status", "phase")
}
found := false
finalizers, _, _ := unstructured.NestedSlice(u.Object, "spec", "finalizers")
for _, finalizer := range finalizers {
if finalizer == "kubernetes" {
found = true
}
}
if !found {
finalizers = append(finalizers, "kubernetes")
unstructured.SetNestedSlice(u.Object, finalizers, "spec", "finalizers")
}
}

View File

@ -0,0 +1,67 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mockkubeapiserver
import (
"encoding/json"
"net/http"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
// resourceRequestBase holds the common field for single-resource requests
type resourceRequestBase struct {
baseRequest
Group string
Version string
Resource string
Namespace string
Name string
SubResource string
}
// getResource is a request to get a single resource
type getResource struct {
resourceRequestBase
}
// Run serves the http request
func (req *getResource) Run(s *MockKubeAPIServer) error {
gr := schema.GroupResource{Group: req.Group, Resource: req.Resource}
var object runtime.Object
objects := s.objects[gr]
if objects != nil {
object = objects.Objects[types.NamespacedName{Namespace: req.Namespace, Name: req.Name}]
}
if object == nil {
return req.writeErrorResponse(http.StatusNotFound)
}
j, err := json.Marshal(object)
if err != nil {
klog.Warningf("object does not marshal: %v", err)
} else {
klog.Infof("returning %v", string(j))
}
return req.writeResponse(object)
}

View File

@ -0,0 +1,117 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mockkubeapiserver
import (
"fmt"
"io/ioutil"
"net/http"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
// patchResource is a request to patch a single resource
type patchResource struct {
resourceRequestBase
}
// Run serves the http request
func (req *patchResource) Run(s *MockKubeAPIServer) error {
gr := schema.GroupResource{Group: req.Group, Resource: req.Resource}
id := types.NamespacedName{Namespace: req.Namespace, Name: req.Name}
var existing *unstructured.Unstructured
objects := s.objects[gr]
if objects != nil {
existing = objects.Objects[id]
}
bodyBytes, err := ioutil.ReadAll(req.r.Body)
if err != nil {
return err
}
body := &unstructured.Unstructured{}
if err := body.UnmarshalJSON(bodyBytes); err != nil {
return fmt.Errorf("failed to parse payload: %w", err)
}
// TODO: We need to implement patch properly
klog.Infof("patch request %#v", string(bodyBytes))
if existing == nil {
// TODO: Only if server-side-apply
if objects == nil {
objects = &objectList{
GroupResource: gr,
Objects: make(map[types.NamespacedName]*unstructured.Unstructured),
}
s.objects[gr] = objects
}
if req.SubResource != "" {
// TODO: Is this correct for server-side-apply?
return req.writeErrorResponse(http.StatusNotFound)
}
patched := body
objects.Objects[id] = patched
s.objectChanged(patched)
return req.writeResponse(patched)
}
if req.SubResource == "" {
if err := applyPatch(existing.Object, body.Object); err != nil {
klog.Warningf("error from patch: %v", err)
return err
}
} else {
// TODO: We need to implement put properly
return fmt.Errorf("unknown subresource %q", req.SubResource)
}
objects.Objects[id] = existing
s.objectChanged(existing)
return req.writeResponse(existing)
}
func applyPatch(existing, patch map[string]interface{}) error {
for k, patchValue := range patch {
existingValue := existing[k]
switch patchValue := patchValue.(type) {
case string, int64:
existing[k] = patchValue
case map[string]interface{}:
if existingValue == nil {
existing[k] = patchValue
} else {
existingMap, ok := existingValue.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected type mismatch, expected map got %T", existingValue)
}
if err := applyPatch(existingMap, patchValue); err != nil {
return err
}
}
default:
return fmt.Errorf("type %T not handled in patch", patchValue)
}
}
return nil
}

View File

@ -0,0 +1,82 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mockkubeapiserver
import (
"fmt"
"io/ioutil"
"net/http"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
// putResource is a request to get a single resource
type putResource struct {
resourceRequestBase
}
// Run serves the http request
func (req *putResource) Run(s *MockKubeAPIServer) error {
gr := schema.GroupResource{Group: req.Group, Resource: req.Resource}
id := types.NamespacedName{Namespace: req.Namespace, Name: req.Name}
var existing runtime.Object
objects := s.objects[gr]
if objects != nil {
existing = objects.Objects[id]
}
if existing == nil {
return req.writeErrorResponse(http.StatusNotFound)
}
bodyBytes, err := ioutil.ReadAll(req.r.Body)
if err != nil {
return err
}
klog.Infof("put request %#v", string(bodyBytes))
body := &unstructured.Unstructured{}
if err := body.UnmarshalJSON(bodyBytes); err != nil {
return fmt.Errorf("failed to parse payload: %w", err)
}
var updated *unstructured.Unstructured
if req.SubResource == "" {
updated = body
} else if req.SubResource == "status" {
updated = existing.DeepCopyObject().(*unstructured.Unstructured)
newStatus := body.Object["status"]
if newStatus == nil {
// TODO: This might be allowed?
return fmt.Errorf("status not specified on status subresource update")
}
updated.Object["status"] = newStatus
} else {
// TODO: We need to implement put properly
return fmt.Errorf("unknown subresource %q", req.SubResource)
}
objects.Objects[id] = updated
s.objectChanged(updated)
return req.writeResponse(updated)
}

View File

@ -18,6 +18,7 @@ package kubemanifest
import (
"bytes"
"encoding/json"
"fmt"
"strings"
@ -44,6 +45,7 @@ func (o *Object) ToUnstructured() *unstructured.Unstructured {
return &unstructured.Unstructured{Object: o.data}
}
// GroupVersionKind returns the group/version/kind information for the object
func (o *Object) GroupVersionKind() schema.GroupVersionKind {
return o.ToUnstructured().GroupVersionKind()
}
@ -126,7 +128,15 @@ func (l ObjectList) ToYAML() ([]byte, error) {
func (m *Object) ToYAML() ([]byte, error) {
b, err := yaml.Marshal(m.data)
if err != nil {
return nil, fmt.Errorf("error marshaling manifest to yaml: %v", err)
return nil, fmt.Errorf("error marshaling manifest to yaml: %w", err)
}
return b, nil
}
func (m *Object) MarshalJSON() ([]byte, error) {
b, err := json.Marshal(m.data)
if err != nil {
return nil, fmt.Errorf("error marshaling manifest to json: %w", err)
}
return b, nil
}