apiserver: use SSA for apf configuration client
Kubernetes-commit: ecc640f1b40f14a894269e4b2ae6c80158626e93
This commit is contained in:
		
							parent
							
								
									9768ba70d8
								
							
						
					
					
						commit
						538eaa3c92
					
				|  | @ -20,7 +20,6 @@ import ( | |||
| 	"context" | ||||
| 	"crypto/sha256" | ||||
| 	"encoding/binary" | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"math" | ||||
|  | @ -34,7 +33,6 @@ import ( | |||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	apitypes "k8s.io/apimachinery/pkg/types" | ||||
| 	utilerrors "k8s.io/apimachinery/pkg/util/errors" | ||||
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
|  | @ -53,6 +51,7 @@ import ( | |||
| 	"k8s.io/utils/clock" | ||||
| 
 | ||||
| 	flowcontrol "k8s.io/api/flowcontrol/v1beta3" | ||||
| 	flowcontrolapplyconfiguration "k8s.io/client-go/applyconfigurations/flowcontrol/v1beta3" | ||||
| 	flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3" | ||||
| 	flowcontrollister "k8s.io/client-go/listers/flowcontrol/v1beta3" | ||||
| ) | ||||
|  | @ -420,19 +419,12 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior | |||
| 
 | ||||
| 		// if we are going to issue an update, be sure we track every name we update so we know if we update it too often.
 | ||||
| 		currResult.updatedItems.Insert(fsu.flowSchema.Name) | ||||
| 		patchBytes, err := makeFlowSchemaConditionPatch(fsu.condition) | ||||
| 		if err != nil { | ||||
| 			// should never happen because these conditions are created here and well formed
 | ||||
| 			panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error())) | ||||
| 		} | ||||
| 		if klogV := klog.V(4); klogV.Enabled() { | ||||
| 			klogV.Infof("%s writing Condition %s to FlowSchema %s, which had ResourceVersion=%s, because its previous value was %s, diff: %s", | ||||
| 				cfgCtlr.name, fsu.condition, fsu.flowSchema.Name, fsu.flowSchema.ResourceVersion, fcfmt.Fmt(fsu.oldValue), cmp.Diff(fsu.oldValue, fsu.condition)) | ||||
| 		} | ||||
| 		fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas() | ||||
| 		patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager} | ||||
| 		_, err = fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status") | ||||
| 		if err != nil { | ||||
| 
 | ||||
| 		if err := apply(cfgCtlr.flowcontrolClient.FlowSchemas(), fsu, cfgCtlr.asFieldManager); err != nil { | ||||
| 			if apierrors.IsNotFound(err) { | ||||
| 				// This object has been deleted.  A notification is coming
 | ||||
| 				// and nothing more needs to be done here.
 | ||||
|  | @ -447,18 +439,27 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior | |||
| 	return suggestedDelay, utilerrors.NewAggregate(errs) | ||||
| } | ||||
| 
 | ||||
| // makeFlowSchemaConditionPatch takes in a condition and returns the patch status as a json.
 | ||||
| func makeFlowSchemaConditionPatch(condition flowcontrol.FlowSchemaCondition) ([]byte, error) { | ||||
| 	o := struct { | ||||
| 		Status flowcontrol.FlowSchemaStatus `json:"status"` | ||||
| 	}{ | ||||
| 		Status: flowcontrol.FlowSchemaStatus{ | ||||
| 			Conditions: []flowcontrol.FlowSchemaCondition{ | ||||
| 				condition, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	return json.Marshal(o) | ||||
| func apply(client flowcontrolclient.FlowSchemaInterface, fsu fsStatusUpdate, asFieldManager string) error { | ||||
| 	applyOptions := metav1.ApplyOptions{FieldManager: asFieldManager, Force: true} | ||||
| 
 | ||||
| 	// the condition field in fsStatusUpdate holds the new condition we want to update.
 | ||||
| 	// TODO: this will break when we have multiple conditions for a flowschema
 | ||||
| 	_, err := client.ApplyStatus(context.TODO(), toFlowSchemaApplyConfiguration(fsu), applyOptions) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| func toFlowSchemaApplyConfiguration(fsUpdate fsStatusUpdate) *flowcontrolapplyconfiguration.FlowSchemaApplyConfiguration { | ||||
| 	condition := flowcontrolapplyconfiguration.FlowSchemaCondition(). | ||||
| 		WithType(fsUpdate.condition.Type). | ||||
| 		WithStatus(fsUpdate.condition.Status). | ||||
| 		WithReason(fsUpdate.condition.Reason). | ||||
| 		WithLastTransitionTime(fsUpdate.condition.LastTransitionTime). | ||||
| 		WithMessage(fsUpdate.condition.Message) | ||||
| 
 | ||||
| 	return flowcontrolapplyconfiguration.FlowSchema(fsUpdate.flowSchema.Name). | ||||
| 		WithStatus(flowcontrolapplyconfiguration.FlowSchemaStatus(). | ||||
| 			WithConditions(condition), | ||||
| 		) | ||||
| } | ||||
| 
 | ||||
| // shouldDelayUpdate checks to see if a flowschema has been updated too often and returns true if a delay is needed.
 | ||||
|  |  | |||
|  | @ -1,88 +0,0 @@ | |||
| /* | ||||
| Copyright 2021 The Kubernetes Authors. | ||||
| 
 | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
| 
 | ||||
|     http://www.apache.org/licenses/LICENSE-2.0
 | ||||
| 
 | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| package flowcontrol | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/google/go-cmp/cmp" | ||||
| 	flowcontrol "k8s.io/api/flowcontrol/v1beta3" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| ) | ||||
| 
 | ||||
| func Test_configController_generatePatchBytes(t *testing.T) { | ||||
| 	now := time.Now().UTC() | ||||
| 	tests := []struct { | ||||
| 		name      string | ||||
| 		condition flowcontrol.FlowSchemaCondition | ||||
| 		want      []byte | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name: "check if only condition is parsed", | ||||
| 			condition: flowcontrol.FlowSchemaCondition{ | ||||
| 				Type:               flowcontrol.FlowSchemaConditionDangling, | ||||
| 				Status:             flowcontrol.ConditionTrue, | ||||
| 				Reason:             "test reason", | ||||
| 				Message:            "test none", | ||||
| 				LastTransitionTime: metav1.NewTime(now), | ||||
| 			}, | ||||
| 			want: []byte(fmt.Sprintf(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":"%s","reason":"test reason","message":"test none"}]}}`, now.Format(time.RFC3339))), | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "check when message has double quotes", | ||||
| 			condition: flowcontrol.FlowSchemaCondition{ | ||||
| 				Type:               flowcontrol.FlowSchemaConditionDangling, | ||||
| 				Status:             flowcontrol.ConditionTrue, | ||||
| 				Reason:             "test reason", | ||||
| 				Message:            `test ""none`, | ||||
| 				LastTransitionTime: metav1.NewTime(now), | ||||
| 			}, | ||||
| 			want: []byte(fmt.Sprintf(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":"%s","reason":"test reason","message":"test \"\"none"}]}}`, now.Format(time.RFC3339))), | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "check when message has a whitespace character that can be escaped", | ||||
| 			condition: flowcontrol.FlowSchemaCondition{ | ||||
| 				Type:               flowcontrol.FlowSchemaConditionDangling, | ||||
| 				Status:             flowcontrol.ConditionTrue, | ||||
| 				Reason:             "test reason", | ||||
| 				Message:            "test \u0009\u0009none", | ||||
| 				LastTransitionTime: metav1.NewTime(now), | ||||
| 			}, | ||||
| 			want: []byte(fmt.Sprintf(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":"%s","reason":"test reason","message":"test \t\tnone"}]}}`, now.Format(time.RFC3339))), | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "check when a few fields (message & lastTransitionTime) are missing", | ||||
| 			condition: flowcontrol.FlowSchemaCondition{ | ||||
| 				Type:   flowcontrol.FlowSchemaConditionDangling, | ||||
| 				Status: flowcontrol.ConditionTrue, | ||||
| 				Reason: "test reason", | ||||
| 			}, | ||||
| 			want: []byte(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":null,"reason":"test reason"}]}}`), | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			got, _ := makeFlowSchemaConditionPatch(tt.condition) | ||||
| 			if !reflect.DeepEqual(got, tt.want) { | ||||
| 				t.Errorf("makeFlowSchemaConditionPatch() got = %s, want %s; diff is %s", got, tt.want, cmp.Diff(tt.want, got)) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
		Loading…
	
		Reference in New Issue