add the storageversion.Manager interface
Kubernetes-commit: 48361711a50d634a676a63ccac177b46f94ffe3b
This commit is contained in:
		
							parent
							
								
									4b8136093d
								
							
						
					
					
						commit
						63a8ee2ad7
					
				|  | @ -17,178 +17,203 @@ limitations under the License. | |||
| package storageversion | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 
 | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/runtime/schema" | ||||
| 	apiserverclientset "k8s.io/apiserver/pkg/client/clientset_generated/clientset" | ||||
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||||
| 	"k8s.io/client-go/kubernetes" | ||||
| 	"k8s.io/client-go/rest" | ||||
| 	_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
 | ||||
| 	"k8s.io/klog" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
| 
 | ||||
| // ResourceInfo contains the information to register the resource to the
 | ||||
| // storage version API.
 | ||||
| type ResourceInfo struct { | ||||
| 	Resource metav1.APIResource | ||||
| 	// We use a standalone Group instead of reusing the Resource.Group
 | ||||
| 	// because Resource.Group is often omitted, see the comment on
 | ||||
| 	// Resource.Group for why it's omitted.
 | ||||
| 	Group                    string | ||||
| 	EncodingVersion          string | ||||
| 	DecodableVersions        []string | ||||
| 	GroupResource schema.GroupResource | ||||
| 
 | ||||
| 	EncodingVersion string | ||||
| 	// Used to calculate decodable versions. Can only be used after all
 | ||||
| 	// equivalent versions are registered by InstallREST.
 | ||||
| 	EquivalentResourceMapper runtime.EquivalentResourceRegistry | ||||
| } | ||||
| 
 | ||||
| // Manager records the resources whose StorageVersions need updates, and provides a method to update those StorageVersions.
 | ||||
| type Manager interface { | ||||
| 	// AddResourceInfo adds ResourceInfo to the manager.
 | ||||
| 	// AddResourceInfo records resources whose StorageVersions need updates
 | ||||
| 	AddResourceInfo(resources ...*ResourceInfo) | ||||
| 	// RemoveResourceInfo removes ResourceInfo from the manager.
 | ||||
| 	RemoveResourceInfo(r *ResourceInfo) | ||||
| 	// UpdatesPending returns if the StorageVersion of a resource is still wait to be updated.
 | ||||
| 	UpdatesPending(group, resource string) bool | ||||
| 
 | ||||
| 	// UpdateStorageVersions updates the StorageVersions.
 | ||||
| 	UpdateStorageVersions(loopbackClientConfig *rest.Config, apiserverID string) | ||||
| 	// Completed returns if updating StorageVersions has completed.
 | ||||
| 	// UpdateStorageVersions tries to update the StorageVersions of the recorded resources
 | ||||
| 	UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, apiserverID string) | ||||
| 	// PendingUpdate returns true if the StorageVersion of the given resource is still pending update.
 | ||||
| 	PendingUpdate(gr schema.GroupResource) bool | ||||
| 	// LastUpdateError returns the last error hit when updating the storage version of the given resource.
 | ||||
| 	LastUpdateError(gr schema.GroupResource) error | ||||
| 	// Completed returns true if updating StorageVersions of all recorded resources has completed.
 | ||||
| 	Completed() bool | ||||
| } | ||||
| 
 | ||||
| var _ Manager = &DefaultManager{} | ||||
| var _ Manager = &defaultManager{} | ||||
| 
 | ||||
| // NewDefaultManager creates a new DefaultManager.
 | ||||
| func NewDefaultManager() *DefaultManager { | ||||
| 	s := &DefaultManager{} | ||||
| // defaultManager indicates if an apiserver has completed reporting its storage versions.
 | ||||
| type defaultManager struct { | ||||
| 	completed atomic.Value | ||||
| 
 | ||||
| 	mu sync.RWMutex | ||||
| 	// managedResourceInfos records the ResourceInfos whose StorageVersions will get updated in the next
 | ||||
| 	// UpdateStorageVersions call
 | ||||
| 	managedResourceInfos map[*ResourceInfo]struct{} | ||||
| 	// managedStatus records the update status of StorageVersion for each GroupResource. Since one
 | ||||
| 	// ResourceInfo may expand into multiple GroupResource (e.g. ingresses.networking.k8s.io and ingresses.extensions),
 | ||||
| 	// this map allows quick status lookup for a GroupResource, during API request handling.
 | ||||
| 	managedStatus map[schema.GroupResource]*updateStatus | ||||
| } | ||||
| 
 | ||||
| type updateStatus struct { | ||||
| 	done    bool | ||||
| 	lastErr error | ||||
| } | ||||
| 
 | ||||
| // NewDefaultManager creates a new defaultManager.
 | ||||
| func NewDefaultManager() Manager { | ||||
| 	s := &defaultManager{} | ||||
| 	s.completed.Store(false) | ||||
| 	s.groupResources = make(map[string]map[string]struct{}) | ||||
| 	s.resources = make(map[*ResourceInfo]struct{}) | ||||
| 	s.managedResourceInfos = make(map[*ResourceInfo]struct{}) | ||||
| 	s.managedStatus = make(map[schema.GroupResource]*updateStatus) | ||||
| 	return s | ||||
| } | ||||
| 
 | ||||
| // AddResourceInfo adds ResourceInfo to the manager.
 | ||||
| // This is not thread-safe. It is expected to be called when the apiserver is installing the endpoints, which is done serially.
 | ||||
| func (s *DefaultManager) AddResourceInfo(resources ...*ResourceInfo) { | ||||
| 	for _, r := range resources { | ||||
| 		s.resources[r] = struct{}{} | ||||
| 		s.addGroupResourceFor(r) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (s *DefaultManager) addGroupResourceFor(r *ResourceInfo) { | ||||
| 	gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(schema.GroupVersionResource{ | ||||
| 		Group:    r.Group, | ||||
| 		Resource: r.Resource.Name, | ||||
| 	}, "") | ||||
| 	for _, gvr := range gvrs { | ||||
| 		s.addGroupResource(gvr.Group, gvr.Resource) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (s *DefaultManager) addGroupResource(group, resource string) { | ||||
| 	if _, ok := s.groupResources[group]; !ok { | ||||
| 		s.groupResources[group] = make(map[string]struct{}) | ||||
| 	} | ||||
| 	s.groupResources[group][resource] = struct{}{} | ||||
| } | ||||
| 
 | ||||
| // RemoveResourceInfo removes ResourceInfo from the manager.
 | ||||
| // It is not safe to call this function concurrently with AddResourceInfo.
 | ||||
| func (s *DefaultManager) RemoveResourceInfo(r *ResourceInfo) { | ||||
| func (s *defaultManager) AddResourceInfo(resources ...*ResourceInfo) { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	delete(s.resources, r) | ||||
| 	s.removeGroupResourceFor(r) | ||||
| 	for _, r := range resources { | ||||
| 		s.managedResourceInfos[r] = struct{}{} | ||||
| 		s.addPendingManagedStatusLocked(r) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (s *DefaultManager) removeGroupResourceFor(r *ResourceInfo) { | ||||
| 	gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(schema.GroupVersionResource{ | ||||
| 		Group:    r.Group, | ||||
| 		Resource: r.Resource.Name, | ||||
| 	}, "") | ||||
| func (s *defaultManager) addPendingManagedStatusLocked(r *ResourceInfo) { | ||||
| 	gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "") | ||||
| 	for _, gvr := range gvrs { | ||||
| 		s.removeGroupResource(gvr.Group, gvr.Version) | ||||
| 		s.managedStatus[gvr.GroupResource()] = &updateStatus{} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (s *DefaultManager) removeGroupResource(group, resource string) { | ||||
| 	if _, ok := s.groupResources[group]; !ok { | ||||
| 		return | ||||
| 	} | ||||
| 	delete(s.groupResources[group], resource) | ||||
| 	if len(s.groupResources[group]) == 0 { | ||||
| 		delete(s.groupResources, group) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // UpdatesPending returns if the StorageVersion of a resource is still wait to be updated.
 | ||||
| func (s *DefaultManager) UpdatesPending(group, resource string) bool { | ||||
| 	s.mu.RLock() | ||||
| 	defer s.mu.RUnlock() | ||||
| 	if _, ok := s.groupResources[group]; !ok { | ||||
| 		return false | ||||
| 	} | ||||
| 	_, ok := s.groupResources[group][resource] | ||||
| 	return ok | ||||
| } | ||||
| 
 | ||||
| // DefaultManager indicates if the aggregator, kube-apiserver, and the
 | ||||
| // apiextensions-apiserver have completed reporting their storage versions.
 | ||||
| type DefaultManager struct { | ||||
| 	completed atomic.Value | ||||
| 
 | ||||
| 	mu             sync.RWMutex | ||||
| 	resources      map[*ResourceInfo]struct{} | ||||
| 	groupResources map[string]map[string]struct{} | ||||
| } | ||||
| 
 | ||||
| // setComplete marks the completion of updating StorageVersions. No write requests need to be blocked anymore.
 | ||||
| func (s *DefaultManager) setComplete() { | ||||
| 	s.completed.Store(true) | ||||
| } | ||||
| 
 | ||||
| // Completed returns if updating StorageVersions has completed.
 | ||||
| func (s *DefaultManager) Completed() bool { | ||||
| 	return s.completed.Load().(bool) | ||||
| } | ||||
| 
 | ||||
| func decodableVersions(e runtime.EquivalentResourceRegistry, group string, resource string) []string { | ||||
| 	var versions []string | ||||
| 	decodingGVRs := e.EquivalentResourcesFor(schema.GroupVersionResource{ | ||||
| 		Group:    group, | ||||
| 		Resource: resource, | ||||
| 	}, "") | ||||
| 	for _, v := range decodingGVRs { | ||||
| 		versions = append(versions, v.GroupVersion().String()) | ||||
| 	} | ||||
| 	return versions | ||||
| } | ||||
| 
 | ||||
| // UpdateStorageVersions updates the StorageVersions. If the updates are
 | ||||
| // successful, following calls to Completed() returns true.
 | ||||
| func (s *DefaultManager) UpdateStorageVersions(loopbackClientConfig *rest.Config, serverID string) { | ||||
| 	cfg := rest.AddUserAgent(loopbackClientConfig, "system:kube-apiserver") | ||||
| 	clientset, err := apiserverclientset.NewForConfig(cfg) | ||||
| // UpdateStorageVersions tries to update the StorageVersions of the recorded resources
 | ||||
| func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, serverID string) { | ||||
| 	clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig) | ||||
| 	if err != nil { | ||||
| 		klog.Fatalf("failed to get clientset: %v", err) | ||||
| 		utilruntime.HandleError(fmt.Errorf("failed to get clientset: %v", err)) | ||||
| 		return | ||||
| 	} | ||||
| 	sc := clientset.InternalV1alpha1().StorageVersions() | ||||
| 
 | ||||
| 	s.mu.RLock() | ||||
| 	resources := s.resources | ||||
| 	resources := make([]*ResourceInfo, len(s.managedResourceInfos)) | ||||
| 	for resource := range s.managedResourceInfos { | ||||
| 		resources = append(resources, resource) | ||||
| 	} | ||||
| 	s.mu.RUnlock() | ||||
| 	for r := range resources { | ||||
| 		r.DecodableVersions = decodableVersions(r.EquivalentResourceMapper, r.Group, r.Resource.Name) | ||||
| 		if err := updateStorageVersionFor(sc, serverID, r.Group+"."+r.Resource.Name, r.EncodingVersion, r.DecodableVersions); err != nil { | ||||
| 			klog.Fatalf("failed to update storage version for %v", r.Resource.Name) | ||||
| 			return | ||||
| 	hasFailure := false | ||||
| 	for _, r := range resources { | ||||
| 		dv := decodableVersions(r.EquivalentResourceMapper, r.GroupResource) | ||||
| 		if err := updateStorageVersionFor(sc, serverID, r.GroupResource, r.EncodingVersion, dv); err != nil { | ||||
| 			utilruntime.HandleError(fmt.Errorf("failed to update storage version for %v: %v", r.GroupResource, err)) | ||||
| 			s.recordStatusFailure(r, err) | ||||
| 			hasFailure = true | ||||
| 			continue | ||||
| 		} | ||||
| 		klog.V(2).Infof("successfully updated storage version for %v", r.Resource.Name) | ||||
| 		s.RemoveResourceInfo(r) | ||||
| 		klog.V(2).Infof("successfully updated storage version for %v", r.GroupResource) | ||||
| 		s.recordStatusSuccess(r) | ||||
| 	} | ||||
| 	if hasFailure { | ||||
| 		return | ||||
| 	} | ||||
| 	klog.V(2).Infof("storage version updates complete") | ||||
| 	s.setComplete() | ||||
| } | ||||
| 
 | ||||
| // recordStatusSuccess marks updated ResourceInfo as completed.
 | ||||
| func (s *defaultManager) recordStatusSuccess(r *ResourceInfo) { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	s.recordStatusSuccessLocked(r) | ||||
| } | ||||
| 
 | ||||
| func (s *defaultManager) recordStatusSuccessLocked(r *ResourceInfo) { | ||||
| 	gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "") | ||||
| 	for _, gvr := range gvrs { | ||||
| 		s.recordSuccessGroupResourceLocked(gvr.GroupResource()) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (s *defaultManager) recordSuccessGroupResourceLocked(gr schema.GroupResource) { | ||||
| 	if _, ok := s.managedStatus[gr]; !ok { | ||||
| 		return | ||||
| 	} | ||||
| 	s.managedStatus[gr].done = true | ||||
| 	s.managedStatus[gr].lastErr = nil | ||||
| } | ||||
| 
 | ||||
| // recordStatusFailure records latest error updating ResourceInfo.
 | ||||
| func (s *defaultManager) recordStatusFailure(r *ResourceInfo, err error) { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	s.recordStatusFailureLocked(r, err) | ||||
| } | ||||
| 
 | ||||
| func (s *defaultManager) recordStatusFailureLocked(r *ResourceInfo, err error) { | ||||
| 	gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "") | ||||
| 	for _, gvr := range gvrs { | ||||
| 		s.recordErrorGroupResourceLocked(gvr.GroupResource(), err) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (s *defaultManager) recordErrorGroupResourceLocked(gr schema.GroupResource, err error) { | ||||
| 	if _, ok := s.managedStatus[gr]; !ok { | ||||
| 		return | ||||
| 	} | ||||
| 	s.managedStatus[gr].lastErr = err | ||||
| } | ||||
| 
 | ||||
| // PendingUpdate returns if the StorageVersion of a resource is still wait to be updated.
 | ||||
| func (s *defaultManager) PendingUpdate(gr schema.GroupResource) bool { | ||||
| 	s.mu.RLock() | ||||
| 	defer s.mu.RUnlock() | ||||
| 	if _, ok := s.managedStatus[gr]; !ok { | ||||
| 		return false | ||||
| 	} | ||||
| 	return !s.managedStatus[gr].done | ||||
| } | ||||
| 
 | ||||
| // LastUpdateError returns the last error hit when updating the storage version of the given resource.
 | ||||
| func (s *defaultManager) LastUpdateError(gr schema.GroupResource) error { | ||||
| 	s.mu.RLock() | ||||
| 	defer s.mu.RUnlock() | ||||
| 	if _, ok := s.managedStatus[gr]; !ok { | ||||
| 		return fmt.Errorf("couldn't find managed status for %v", gr) | ||||
| 	} | ||||
| 	return s.managedStatus[gr].lastErr | ||||
| } | ||||
| 
 | ||||
| // setComplete marks the completion of updating StorageVersions. No write requests need to be blocked anymore.
 | ||||
| func (s *defaultManager) setComplete() { | ||||
| 	s.completed.Store(true) | ||||
| } | ||||
| 
 | ||||
| // Completed returns if updating StorageVersions has completed.
 | ||||
| func (s *defaultManager) Completed() bool { | ||||
| 	return s.completed.Load().(bool) | ||||
| } | ||||
| 
 | ||||
| func decodableVersions(e runtime.EquivalentResourceRegistry, gr schema.GroupResource) []string { | ||||
| 	var versions []string | ||||
| 	decodingGVRs := e.EquivalentResourcesFor(gr.WithVersion(""), "") | ||||
| 	for _, v := range decodingGVRs { | ||||
| 		versions = append(versions, v.GroupVersion().String()) | ||||
| 	} | ||||
| 	return versions | ||||
| } | ||||
|  |  | |||
|  | @ -1,5 +1,5 @@ | |||
| /* | ||||
| Copyright 2019 The Kubernetes Authors. | ||||
| Copyright 2020 The Kubernetes Authors. | ||||
| 
 | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
|  | @ -18,12 +18,14 @@ package storageversion | |||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"k8s.io/api/apiserverinternal/v1alpha1" | ||||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apiserver/pkg/apis/apiserverinternal/v1alpha1" | ||||
| 	"k8s.io/klog" | ||||
| 	"k8s.io/apimachinery/pkg/runtime/schema" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
| 
 | ||||
| // Client has the methods required to update the storage version.
 | ||||
|  | @ -33,33 +35,32 @@ type Client interface { | |||
| 	Get(context.Context, string, metav1.GetOptions) (*v1alpha1.StorageVersion, error) | ||||
| } | ||||
| 
 | ||||
| func setAgreedEncodingVersion(sv *v1alpha1.StorageVersion) { | ||||
| 	if len(sv.Status.ServerStorageVersions) == 0 { | ||||
| func setCommonEncodingVersion(sv *v1alpha1.StorageVersion) { | ||||
| 	if len(sv.Status.StorageVersions) == 0 { | ||||
| 		return | ||||
| 	} | ||||
| 	firstVersion := sv.Status.ServerStorageVersions[0].EncodingVersion | ||||
| 	firstVersion := sv.Status.StorageVersions[0].EncodingVersion | ||||
| 	agreed := true | ||||
| 	for _, ssv := range sv.Status.ServerStorageVersions { | ||||
| 	for _, ssv := range sv.Status.StorageVersions { | ||||
| 		if ssv.EncodingVersion != firstVersion { | ||||
| 			agreed = false | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 	if agreed { | ||||
| 		sv.Status.AgreedEncodingVersion = &firstVersion | ||||
| 		sv.Status.CommonEncodingVersion = &firstVersion | ||||
| 	} else { | ||||
| 		sv.Status.AgreedEncodingVersion = nil | ||||
| 		sv.Status.CommonEncodingVersion = nil | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // updateStorageVersionFor updates the storage version object for the resource.
 | ||||
| // resource is of the format "<group>.<resource>".
 | ||||
| // TODO: split the resource parameter to two.
 | ||||
| func updateStorageVersionFor(c Client, apiserverID string, resource string, encodingVersion string, decodableVersions []string) error { | ||||
| func updateStorageVersionFor(c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string) error { | ||||
| 	retries := 3 | ||||
| 	var retry int | ||||
| 	var err error | ||||
| 	for retry < retries { | ||||
| 		err = singleUpdate(c, apiserverID, resource, encodingVersion, decodableVersions) | ||||
| 		err = singleUpdate(c, apiserverID, gr, encodingVersion, decodableVersions) | ||||
| 		if err == nil { | ||||
| 			return nil | ||||
| 		} | ||||
|  | @ -68,7 +69,7 @@ func updateStorageVersionFor(c Client, apiserverID string, resource string, enco | |||
| 			continue | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			klog.Errorf("retry %d, failed to update storage version for %s: %v", retry, resource, err) | ||||
| 			klog.Errorf("retry %d, failed to update storage version for %v: %v", retry, gr, err) | ||||
| 			retry++ | ||||
| 			time.Sleep(1 * time.Second) | ||||
| 		} | ||||
|  | @ -76,41 +77,46 @@ func updateStorageVersionFor(c Client, apiserverID string, resource string, enco | |||
| 	return err | ||||
| } | ||||
| 
 | ||||
| func singleUpdate(c Client, apiserverID, resource, encodingVersion string, decodableVersions []string) error { | ||||
| func singleUpdate(c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string) error { | ||||
| 	shouldCreate := false | ||||
| 	sv, err := c.Get(context.TODO(), resource, metav1.GetOptions{}) | ||||
| 	name := fmt.Sprintf("%s.%s", gr.Group, gr.Resource) | ||||
| 	sv, err := c.Get(context.TODO(), name, metav1.GetOptions{}) | ||||
| 	if err != nil && !apierrors.IsNotFound(err) { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err != nil && apierrors.IsNotFound(err) { | ||||
| 	if apierrors.IsNotFound(err) { | ||||
| 		shouldCreate = true | ||||
| 		sv = &v1alpha1.StorageVersion{} | ||||
| 		sv.ObjectMeta.Name = resource | ||||
| 		sv.ObjectMeta.Name = name | ||||
| 	} | ||||
| 	localUpdateStorageVersion(sv, apiserverID, encodingVersion, decodableVersions) | ||||
| 	updatedSV := localUpdateStorageVersion(sv, apiserverID, encodingVersion, decodableVersions) | ||||
| 	if shouldCreate { | ||||
| 		_, err := c.Create(context.TODO(), sv, metav1.CreateOptions{}) | ||||
| 		_, err := c.Create(context.TODO(), updatedSV, metav1.CreateOptions{}) | ||||
| 		return err | ||||
| 	} | ||||
| 	_, err = c.Update(context.TODO(), sv, metav1.UpdateOptions{}) | ||||
| 	_, err = c.Update(context.TODO(), updatedSV, metav1.UpdateOptions{}) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| func localUpdateStorageVersion(sv *v1alpha1.StorageVersion, apiserverID, encodingVersion string, decodableVersions []string) { | ||||
| // localUpdateStorageVersion updates the input storageversion with given server storageversion info.
 | ||||
| // The function updates the input storageversion in place.
 | ||||
| func localUpdateStorageVersion(sv *v1alpha1.StorageVersion, apiserverID, encodingVersion string, decodableVersions []string) *v1alpha1.StorageVersion { | ||||
| 	newSSV := v1alpha1.ServerStorageVersion{ | ||||
| 		APIServerID:       apiserverID, | ||||
| 		EncodingVersion:   encodingVersion, | ||||
| 		DecodableVersions: decodableVersions, | ||||
| 	} | ||||
| 	foundSSV := false | ||||
| 	for i, ssv := range sv.Status.ServerStorageVersions { | ||||
| 	for i, ssv := range sv.Status.StorageVersions { | ||||
| 		if ssv.APIServerID == apiserverID { | ||||
| 			sv.Status.ServerStorageVersions[i] = newSSV | ||||
| 			sv.Status.StorageVersions[i] = newSSV | ||||
| 			foundSSV = true | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 	if !foundSSV { | ||||
| 		sv.Status.ServerStorageVersions = append(sv.Status.ServerStorageVersions, newSSV) | ||||
| 		sv.Status.StorageVersions = append(sv.Status.StorageVersions, newSSV) | ||||
| 	} | ||||
| 	setAgreedEncodingVersion(sv) | ||||
| 	setCommonEncodingVersion(sv) | ||||
| 	return sv | ||||
| } | ||||
|  |  | |||
|  | @ -21,7 +21,7 @@ import ( | |||
| 	"testing" | ||||
| 
 | ||||
| 	"github.com/google/go-cmp/cmp" | ||||
| 	"k8s.io/apiserver/pkg/apis/apiserverinternal/v1alpha1" | ||||
| 	"k8s.io/api/apiserverinternal/v1alpha1" | ||||
| ) | ||||
| 
 | ||||
| func TestLocalUpdateStorageVersion(t *testing.T) { | ||||
|  | @ -56,46 +56,46 @@ func TestLocalUpdateStorageVersion(t *testing.T) { | |||
| 			old:    v1alpha1.StorageVersionStatus{}, | ||||
| 			newSSV: ssv1, | ||||
| 			expected: v1alpha1.StorageVersionStatus{ | ||||
| 				ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1}, | ||||
| 				AgreedEncodingVersion: &v1, | ||||
| 				StorageVersions:       []v1alpha1.ServerStorageVersion{ssv1}, | ||||
| 				CommonEncodingVersion: &v1, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			old: v1alpha1.StorageVersionStatus{ | ||||
| 				ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2}, | ||||
| 				AgreedEncodingVersion: &v1, | ||||
| 				StorageVersions:       []v1alpha1.ServerStorageVersion{ssv1, ssv2}, | ||||
| 				CommonEncodingVersion: &v1, | ||||
| 			}, | ||||
| 			newSSV: ssv3, | ||||
| 			expected: v1alpha1.StorageVersionStatus{ | ||||
| 				ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3}, | ||||
| 				StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			old: v1alpha1.StorageVersionStatus{ | ||||
| 				ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2}, | ||||
| 				AgreedEncodingVersion: &v1, | ||||
| 				StorageVersions:       []v1alpha1.ServerStorageVersion{ssv1, ssv2}, | ||||
| 				CommonEncodingVersion: &v1, | ||||
| 			}, | ||||
| 			newSSV: ssv4, | ||||
| 			expected: v1alpha1.StorageVersionStatus{ | ||||
| 				ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv4}, | ||||
| 				AgreedEncodingVersion: &v1, | ||||
| 				StorageVersions:       []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv4}, | ||||
| 				CommonEncodingVersion: &v1, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			old: v1alpha1.StorageVersionStatus{ | ||||
| 				ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3}, | ||||
| 				StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3}, | ||||
| 			}, | ||||
| 			newSSV: ssv4, | ||||
| 			expected: v1alpha1.StorageVersionStatus{ | ||||
| 				ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3, ssv4}, | ||||
| 				StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3, ssv4}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	for _, tc := range tests { | ||||
| 		sv := &v1alpha1.StorageVersion{Status: tc.old} | ||||
| 		localUpdateStorageVersion(sv, tc.newSSV.APIServerID, tc.newSSV.EncodingVersion, tc.newSSV.DecodableVersions) | ||||
| 		if e, a := tc.expected, sv.Status; !reflect.DeepEqual(e, a) { | ||||
| 		updated := localUpdateStorageVersion(sv, tc.newSSV.APIServerID, tc.newSSV.EncodingVersion, tc.newSSV.DecodableVersions) | ||||
| 		if e, a := tc.expected, updated.Status; !reflect.DeepEqual(e, a) { | ||||
| 			t.Errorf("unexpected: %v", cmp.Diff(e, a)) | ||||
| 		} | ||||
| 	} | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue