diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 890271282..2accba46e 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -672,15 +672,15 @@ }, { "ImportPath": "k8s.io/api", - "Rev": "18112a7b933b" + "Rev": "2c3c141c931c" }, { "ImportPath": "k8s.io/apimachinery", - "Rev": "10b38829b621" + "Rev": "d4f471b82f0a" }, { "ImportPath": "k8s.io/client-go", - "Rev": "758467711e07" + "Rev": "5682372f3538" }, { "ImportPath": "k8s.io/component-base", diff --git a/go.mod b/go.mod index a7790ad3d..5ae7be8e0 100644 --- a/go.mod +++ b/go.mod @@ -41,9 +41,9 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/square/go-jose.v2 v2.2.2 gopkg.in/yaml.v2 v2.2.8 - k8s.io/api v0.0.0-20201003235837-18112a7b933b - k8s.io/apimachinery v0.0.0-20201003235655-10b38829b621 - k8s.io/client-go v0.0.0-20201004000108-758467711e07 + k8s.io/api v0.0.0-20201005155906-2c3c141c931c + k8s.io/apimachinery v0.0.0-20201006035708-d4f471b82f0a + k8s.io/client-go v0.0.0-20201007120136-5682372f3538 k8s.io/component-base v0.0.0-20201004000625-609bde980a40 k8s.io/klog/v2 v2.2.0 k8s.io/kube-openapi v0.0.0-20200923155610-8b5066479488 @@ -54,8 +54,8 @@ require ( ) replace ( - k8s.io/api => k8s.io/api v0.0.0-20201003235837-18112a7b933b - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20201003235655-10b38829b621 - k8s.io/client-go => k8s.io/client-go v0.0.0-20201004000108-758467711e07 + k8s.io/api => k8s.io/api v0.0.0-20201005155906-2c3c141c931c + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20201006035708-d4f471b82f0a + k8s.io/client-go => k8s.io/client-go v0.0.0-20201007120136-5682372f3538 k8s.io/component-base => k8s.io/component-base v0.0.0-20201004000625-609bde980a40 ) diff --git a/go.sum b/go.sum index f976ea668..84e7a4849 100644 --- a/go.sum +++ b/go.sum @@ -502,9 +502,9 @@ honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= -k8s.io/api v0.0.0-20201003235837-18112a7b933b/go.mod h1:aE9OxkIvJAf55JfJy/0UlrF0nZT2ZDhmsWnNzhbAySI= -k8s.io/apimachinery v0.0.0-20201003235655-10b38829b621/go.mod h1:6s3VNb000AUbBIxR7q3WHlbBwfpEGqIJsCG5gIX+0LI= -k8s.io/client-go v0.0.0-20201004000108-758467711e07/go.mod h1:zltjMDhkwCKG2AvwcACaWC+fwiTEB0qElpbFMvxx+/g= +k8s.io/api v0.0.0-20201005155906-2c3c141c931c/go.mod h1:aE9OxkIvJAf55JfJy/0UlrF0nZT2ZDhmsWnNzhbAySI= +k8s.io/apimachinery v0.0.0-20201006035708-d4f471b82f0a/go.mod h1:6s3VNb000AUbBIxR7q3WHlbBwfpEGqIJsCG5gIX+0LI= +k8s.io/client-go v0.0.0-20201007120136-5682372f3538/go.mod h1:QvSrA3Tta0gMhxbrDVuu/Nf+5+qRalN2AXEQPQpgLPQ= k8s.io/component-base v0.0.0-20201004000625-609bde980a40/go.mod h1:WUOeHeahPXuwTMTTZxsAJUyXuIRVbvgNTCVvlzwewJU= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= diff --git a/pkg/storageversion/OWNERS b/pkg/storageversion/OWNERS new file mode 100644 index 000000000..ca9aa1358 --- /dev/null +++ b/pkg/storageversion/OWNERS @@ -0,0 +1,5 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: +- caesarxuchao +- roycaihw diff --git a/pkg/storageversion/manager.go b/pkg/storageversion/manager.go new file mode 100644 index 000000000..5903eac93 --- /dev/null +++ b/pkg/storageversion/manager.go @@ -0,0 +1,219 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "fmt" + "sync" + "sync/atomic" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + _ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration + "k8s.io/klog/v2" +) + +// ResourceInfo contains the information to register the resource to the +// storage version API. +type ResourceInfo struct { + GroupResource schema.GroupResource + + EncodingVersion string + // Used to calculate decodable versions. Can only be used after all + // equivalent versions are registered by InstallREST. + EquivalentResourceMapper runtime.EquivalentResourceRegistry +} + +// Manager records the resources whose StorageVersions need updates, and provides a method to update those StorageVersions. +type Manager interface { + // AddResourceInfo records resources whose StorageVersions need updates + AddResourceInfo(resources ...*ResourceInfo) + // UpdateStorageVersions tries to update the StorageVersions of the recorded resources + UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, apiserverID string) + // PendingUpdate returns true if the StorageVersion of the given resource is still pending update. + PendingUpdate(gr schema.GroupResource) bool + // LastUpdateError returns the last error hit when updating the storage version of the given resource. + LastUpdateError(gr schema.GroupResource) error + // Completed returns true if updating StorageVersions of all recorded resources has completed. + Completed() bool +} + +var _ Manager = &defaultManager{} + +// defaultManager indicates if an apiserver has completed reporting its storage versions. +type defaultManager struct { + completed atomic.Value + + mu sync.RWMutex + // managedResourceInfos records the ResourceInfos whose StorageVersions will get updated in the next + // UpdateStorageVersions call + managedResourceInfos map[*ResourceInfo]struct{} + // managedStatus records the update status of StorageVersion for each GroupResource. Since one + // ResourceInfo may expand into multiple GroupResource (e.g. ingresses.networking.k8s.io and ingresses.extensions), + // this map allows quick status lookup for a GroupResource, during API request handling. + managedStatus map[schema.GroupResource]*updateStatus +} + +type updateStatus struct { + done bool + lastErr error +} + +// NewDefaultManager creates a new defaultManager. +func NewDefaultManager() Manager { + s := &defaultManager{} + s.completed.Store(false) + s.managedResourceInfos = make(map[*ResourceInfo]struct{}) + s.managedStatus = make(map[schema.GroupResource]*updateStatus) + return s +} + +// AddResourceInfo adds ResourceInfo to the manager. +func (s *defaultManager) AddResourceInfo(resources ...*ResourceInfo) { + s.mu.Lock() + defer s.mu.Unlock() + for _, r := range resources { + s.managedResourceInfos[r] = struct{}{} + s.addPendingManagedStatusLocked(r) + } +} + +func (s *defaultManager) addPendingManagedStatusLocked(r *ResourceInfo) { + gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "") + for _, gvr := range gvrs { + s.managedStatus[gvr.GroupResource()] = &updateStatus{} + } +} + +// UpdateStorageVersions tries to update the StorageVersions of the recorded resources +func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, serverID string) { + clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to get clientset: %v", err)) + return + } + sc := clientset.InternalV1alpha1().StorageVersions() + + s.mu.RLock() + resources := make([]*ResourceInfo, len(s.managedResourceInfos)) + for resource := range s.managedResourceInfos { + resources = append(resources, resource) + } + s.mu.RUnlock() + hasFailure := false + for _, r := range resources { + dv := decodableVersions(r.EquivalentResourceMapper, r.GroupResource) + if err := updateStorageVersionFor(sc, serverID, r.GroupResource, r.EncodingVersion, dv); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to update storage version for %v: %v", r.GroupResource, err)) + s.recordStatusFailure(r, err) + hasFailure = true + continue + } + klog.V(2).Infof("successfully updated storage version for %v", r.GroupResource) + s.recordStatusSuccess(r) + } + if hasFailure { + return + } + klog.V(2).Infof("storage version updates complete") + s.setComplete() +} + +// recordStatusSuccess marks updated ResourceInfo as completed. +func (s *defaultManager) recordStatusSuccess(r *ResourceInfo) { + s.mu.Lock() + defer s.mu.Unlock() + s.recordStatusSuccessLocked(r) +} + +func (s *defaultManager) recordStatusSuccessLocked(r *ResourceInfo) { + gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "") + for _, gvr := range gvrs { + s.recordSuccessGroupResourceLocked(gvr.GroupResource()) + } +} + +func (s *defaultManager) recordSuccessGroupResourceLocked(gr schema.GroupResource) { + if _, ok := s.managedStatus[gr]; !ok { + return + } + s.managedStatus[gr].done = true + s.managedStatus[gr].lastErr = nil +} + +// recordStatusFailure records latest error updating ResourceInfo. +func (s *defaultManager) recordStatusFailure(r *ResourceInfo, err error) { + s.mu.Lock() + defer s.mu.Unlock() + s.recordStatusFailureLocked(r, err) +} + +func (s *defaultManager) recordStatusFailureLocked(r *ResourceInfo, err error) { + gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "") + for _, gvr := range gvrs { + s.recordErrorGroupResourceLocked(gvr.GroupResource(), err) + } +} + +func (s *defaultManager) recordErrorGroupResourceLocked(gr schema.GroupResource, err error) { + if _, ok := s.managedStatus[gr]; !ok { + return + } + s.managedStatus[gr].lastErr = err +} + +// PendingUpdate returns if the StorageVersion of a resource is still wait to be updated. +func (s *defaultManager) PendingUpdate(gr schema.GroupResource) bool { + s.mu.RLock() + defer s.mu.RUnlock() + if _, ok := s.managedStatus[gr]; !ok { + return false + } + return !s.managedStatus[gr].done +} + +// LastUpdateError returns the last error hit when updating the storage version of the given resource. +func (s *defaultManager) LastUpdateError(gr schema.GroupResource) error { + s.mu.RLock() + defer s.mu.RUnlock() + if _, ok := s.managedStatus[gr]; !ok { + return fmt.Errorf("couldn't find managed status for %v", gr) + } + return s.managedStatus[gr].lastErr +} + +// setComplete marks the completion of updating StorageVersions. No write requests need to be blocked anymore. +func (s *defaultManager) setComplete() { + s.completed.Store(true) +} + +// Completed returns if updating StorageVersions has completed. +func (s *defaultManager) Completed() bool { + return s.completed.Load().(bool) +} + +func decodableVersions(e runtime.EquivalentResourceRegistry, gr schema.GroupResource) []string { + var versions []string + decodingGVRs := e.EquivalentResourcesFor(gr.WithVersion(""), "") + for _, v := range decodingGVRs { + versions = append(versions, v.GroupVersion().String()) + } + return versions +} diff --git a/pkg/storageversion/updater.go b/pkg/storageversion/updater.go new file mode 100644 index 000000000..110accf22 --- /dev/null +++ b/pkg/storageversion/updater.go @@ -0,0 +1,122 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "context" + "fmt" + "time" + + "k8s.io/api/apiserverinternal/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" +) + +// Client has the methods required to update the storage version. +type Client interface { + Create(context.Context, *v1alpha1.StorageVersion, metav1.CreateOptions) (*v1alpha1.StorageVersion, error) + Update(context.Context, *v1alpha1.StorageVersion, metav1.UpdateOptions) (*v1alpha1.StorageVersion, error) + Get(context.Context, string, metav1.GetOptions) (*v1alpha1.StorageVersion, error) +} + +func setCommonEncodingVersion(sv *v1alpha1.StorageVersion) { + if len(sv.Status.StorageVersions) == 0 { + return + } + firstVersion := sv.Status.StorageVersions[0].EncodingVersion + agreed := true + for _, ssv := range sv.Status.StorageVersions { + if ssv.EncodingVersion != firstVersion { + agreed = false + break + } + } + if agreed { + sv.Status.CommonEncodingVersion = &firstVersion + } else { + sv.Status.CommonEncodingVersion = nil + } +} + +// updateStorageVersionFor updates the storage version object for the resource. +func updateStorageVersionFor(c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string) error { + retries := 3 + var retry int + var err error + for retry < retries { + err = singleUpdate(c, apiserverID, gr, encodingVersion, decodableVersions) + if err == nil { + return nil + } + if apierrors.IsAlreadyExists(err) || apierrors.IsConflict(err) { + time.Sleep(1 * time.Second) + continue + } + if err != nil { + klog.Errorf("retry %d, failed to update storage version for %v: %v", retry, gr, err) + retry++ + time.Sleep(1 * time.Second) + } + } + return err +} + +func singleUpdate(c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string) error { + shouldCreate := false + name := fmt.Sprintf("%s.%s", gr.Group, gr.Resource) + sv, err := c.Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + if apierrors.IsNotFound(err) { + shouldCreate = true + sv = &v1alpha1.StorageVersion{} + sv.ObjectMeta.Name = name + } + updatedSV := localUpdateStorageVersion(sv, apiserverID, encodingVersion, decodableVersions) + if shouldCreate { + _, err := c.Create(context.TODO(), updatedSV, metav1.CreateOptions{}) + return err + } + _, err = c.Update(context.TODO(), updatedSV, metav1.UpdateOptions{}) + return err +} + +// localUpdateStorageVersion updates the input storageversion with given server storageversion info. +// The function updates the input storageversion in place. +func localUpdateStorageVersion(sv *v1alpha1.StorageVersion, apiserverID, encodingVersion string, decodableVersions []string) *v1alpha1.StorageVersion { + newSSV := v1alpha1.ServerStorageVersion{ + APIServerID: apiserverID, + EncodingVersion: encodingVersion, + DecodableVersions: decodableVersions, + } + foundSSV := false + for i, ssv := range sv.Status.StorageVersions { + if ssv.APIServerID == apiserverID { + sv.Status.StorageVersions[i] = newSSV + foundSSV = true + break + } + } + if !foundSSV { + sv.Status.StorageVersions = append(sv.Status.StorageVersions, newSSV) + } + setCommonEncodingVersion(sv) + return sv +} diff --git a/pkg/storageversion/updater_test.go b/pkg/storageversion/updater_test.go new file mode 100644 index 000000000..be899a280 --- /dev/null +++ b/pkg/storageversion/updater_test.go @@ -0,0 +1,102 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/api/apiserverinternal/v1alpha1" +) + +func TestLocalUpdateStorageVersion(t *testing.T) { + v1 := "v1" + ssv1 := v1alpha1.ServerStorageVersion{ + APIServerID: "1", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "v2"}, + } + ssv2 := v1alpha1.ServerStorageVersion{ + APIServerID: "2", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "v2"}, + } + // ssv3 has a different encoding version + ssv3 := v1alpha1.ServerStorageVersion{ + APIServerID: "3", + EncodingVersion: "v2", + DecodableVersions: []string{"v1", "v2"}, + } + ssv4 := v1alpha1.ServerStorageVersion{ + APIServerID: "4", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "v2", "v4"}, + } + tests := []struct { + old v1alpha1.StorageVersionStatus + newSSV v1alpha1.ServerStorageVersion + expected v1alpha1.StorageVersionStatus + }{ + { + old: v1alpha1.StorageVersionStatus{}, + newSSV: ssv1, + expected: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1}, + CommonEncodingVersion: &v1, + }, + }, + { + old: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2}, + CommonEncodingVersion: &v1, + }, + newSSV: ssv3, + expected: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3}, + }, + }, + { + old: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2}, + CommonEncodingVersion: &v1, + }, + newSSV: ssv4, + expected: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv4}, + CommonEncodingVersion: &v1, + }, + }, + { + old: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3}, + }, + newSSV: ssv4, + expected: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3, ssv4}, + }, + }, + } + + for _, tc := range tests { + sv := &v1alpha1.StorageVersion{Status: tc.old} + updated := localUpdateStorageVersion(sv, tc.newSSV.APIServerID, tc.newSSV.EncodingVersion, tc.newSSV.DecodableVersions) + if e, a := tc.expected, updated.Status; !reflect.DeepEqual(e, a) { + t.Errorf("unexpected: %v", cmp.Diff(e, a)) + } + } +}