Merge pull request #92459 from roycaihw/storage-version/manager

Add storageversion manager interface

Kubernetes-commit: 6d01c5a58996d1619ac049c2b3077274299eb2d0
This commit is contained in:
Kubernetes Publisher 2020-10-14 03:19:55 -07:00
commit 968a41a8a7
7 changed files with 460 additions and 12 deletions

6
Godeps/Godeps.json generated
View File

@ -672,15 +672,15 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "18112a7b933b"
"Rev": "2c3c141c931c"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "10b38829b621"
"Rev": "d4f471b82f0a"
},
{
"ImportPath": "k8s.io/client-go",
"Rev": "758467711e07"
"Rev": "5682372f3538"
},
{
"ImportPath": "k8s.io/component-base",

12
go.mod
View File

@ -41,9 +41,9 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/square/go-jose.v2 v2.2.2
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.0.0-20201003235837-18112a7b933b
k8s.io/apimachinery v0.0.0-20201003235655-10b38829b621
k8s.io/client-go v0.0.0-20201004000108-758467711e07
k8s.io/api v0.0.0-20201005155906-2c3c141c931c
k8s.io/apimachinery v0.0.0-20201006035708-d4f471b82f0a
k8s.io/client-go v0.0.0-20201007120136-5682372f3538
k8s.io/component-base v0.0.0-20201004000625-609bde980a40
k8s.io/klog/v2 v2.2.0
k8s.io/kube-openapi v0.0.0-20200923155610-8b5066479488
@ -54,8 +54,8 @@ require (
)
replace (
k8s.io/api => k8s.io/api v0.0.0-20201003235837-18112a7b933b
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20201003235655-10b38829b621
k8s.io/client-go => k8s.io/client-go v0.0.0-20201004000108-758467711e07
k8s.io/api => k8s.io/api v0.0.0-20201005155906-2c3c141c931c
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20201006035708-d4f471b82f0a
k8s.io/client-go => k8s.io/client-go v0.0.0-20201007120136-5682372f3538
k8s.io/component-base => k8s.io/component-base v0.0.0-20201004000625-609bde980a40
)

6
go.sum
View File

@ -502,9 +502,9 @@ honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
k8s.io/api v0.0.0-20201003235837-18112a7b933b/go.mod h1:aE9OxkIvJAf55JfJy/0UlrF0nZT2ZDhmsWnNzhbAySI=
k8s.io/apimachinery v0.0.0-20201003235655-10b38829b621/go.mod h1:6s3VNb000AUbBIxR7q3WHlbBwfpEGqIJsCG5gIX+0LI=
k8s.io/client-go v0.0.0-20201004000108-758467711e07/go.mod h1:zltjMDhkwCKG2AvwcACaWC+fwiTEB0qElpbFMvxx+/g=
k8s.io/api v0.0.0-20201005155906-2c3c141c931c/go.mod h1:aE9OxkIvJAf55JfJy/0UlrF0nZT2ZDhmsWnNzhbAySI=
k8s.io/apimachinery v0.0.0-20201006035708-d4f471b82f0a/go.mod h1:6s3VNb000AUbBIxR7q3WHlbBwfpEGqIJsCG5gIX+0LI=
k8s.io/client-go v0.0.0-20201007120136-5682372f3538/go.mod h1:QvSrA3Tta0gMhxbrDVuu/Nf+5+qRalN2AXEQPQpgLPQ=
k8s.io/component-base v0.0.0-20201004000625-609bde980a40/go.mod h1:WUOeHeahPXuwTMTTZxsAJUyXuIRVbvgNTCVvlzwewJU=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=

View File

@ -0,0 +1,5 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- caesarxuchao
- roycaihw

View File

@ -0,0 +1,219 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storageversion
import (
"fmt"
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
"k8s.io/klog/v2"
)
// ResourceInfo contains the information to register the resource to the
// storage version API.
type ResourceInfo struct {
GroupResource schema.GroupResource
EncodingVersion string
// Used to calculate decodable versions. Can only be used after all
// equivalent versions are registered by InstallREST.
EquivalentResourceMapper runtime.EquivalentResourceRegistry
}
// Manager records the resources whose StorageVersions need updates, and provides a method to update those StorageVersions.
type Manager interface {
// AddResourceInfo records resources whose StorageVersions need updates
AddResourceInfo(resources ...*ResourceInfo)
// UpdateStorageVersions tries to update the StorageVersions of the recorded resources
UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, apiserverID string)
// PendingUpdate returns true if the StorageVersion of the given resource is still pending update.
PendingUpdate(gr schema.GroupResource) bool
// LastUpdateError returns the last error hit when updating the storage version of the given resource.
LastUpdateError(gr schema.GroupResource) error
// Completed returns true if updating StorageVersions of all recorded resources has completed.
Completed() bool
}
var _ Manager = &defaultManager{}
// defaultManager indicates if an apiserver has completed reporting its storage versions.
type defaultManager struct {
completed atomic.Value
mu sync.RWMutex
// managedResourceInfos records the ResourceInfos whose StorageVersions will get updated in the next
// UpdateStorageVersions call
managedResourceInfos map[*ResourceInfo]struct{}
// managedStatus records the update status of StorageVersion for each GroupResource. Since one
// ResourceInfo may expand into multiple GroupResource (e.g. ingresses.networking.k8s.io and ingresses.extensions),
// this map allows quick status lookup for a GroupResource, during API request handling.
managedStatus map[schema.GroupResource]*updateStatus
}
type updateStatus struct {
done bool
lastErr error
}
// NewDefaultManager creates a new defaultManager.
func NewDefaultManager() Manager {
s := &defaultManager{}
s.completed.Store(false)
s.managedResourceInfos = make(map[*ResourceInfo]struct{})
s.managedStatus = make(map[schema.GroupResource]*updateStatus)
return s
}
// AddResourceInfo adds ResourceInfo to the manager.
func (s *defaultManager) AddResourceInfo(resources ...*ResourceInfo) {
s.mu.Lock()
defer s.mu.Unlock()
for _, r := range resources {
s.managedResourceInfos[r] = struct{}{}
s.addPendingManagedStatusLocked(r)
}
}
func (s *defaultManager) addPendingManagedStatusLocked(r *ResourceInfo) {
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "")
for _, gvr := range gvrs {
s.managedStatus[gvr.GroupResource()] = &updateStatus{}
}
}
// UpdateStorageVersions tries to update the StorageVersions of the recorded resources
func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, serverID string) {
clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get clientset: %v", err))
return
}
sc := clientset.InternalV1alpha1().StorageVersions()
s.mu.RLock()
resources := make([]*ResourceInfo, len(s.managedResourceInfos))
for resource := range s.managedResourceInfos {
resources = append(resources, resource)
}
s.mu.RUnlock()
hasFailure := false
for _, r := range resources {
dv := decodableVersions(r.EquivalentResourceMapper, r.GroupResource)
if err := updateStorageVersionFor(sc, serverID, r.GroupResource, r.EncodingVersion, dv); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to update storage version for %v: %v", r.GroupResource, err))
s.recordStatusFailure(r, err)
hasFailure = true
continue
}
klog.V(2).Infof("successfully updated storage version for %v", r.GroupResource)
s.recordStatusSuccess(r)
}
if hasFailure {
return
}
klog.V(2).Infof("storage version updates complete")
s.setComplete()
}
// recordStatusSuccess marks updated ResourceInfo as completed.
func (s *defaultManager) recordStatusSuccess(r *ResourceInfo) {
s.mu.Lock()
defer s.mu.Unlock()
s.recordStatusSuccessLocked(r)
}
func (s *defaultManager) recordStatusSuccessLocked(r *ResourceInfo) {
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "")
for _, gvr := range gvrs {
s.recordSuccessGroupResourceLocked(gvr.GroupResource())
}
}
func (s *defaultManager) recordSuccessGroupResourceLocked(gr schema.GroupResource) {
if _, ok := s.managedStatus[gr]; !ok {
return
}
s.managedStatus[gr].done = true
s.managedStatus[gr].lastErr = nil
}
// recordStatusFailure records latest error updating ResourceInfo.
func (s *defaultManager) recordStatusFailure(r *ResourceInfo, err error) {
s.mu.Lock()
defer s.mu.Unlock()
s.recordStatusFailureLocked(r, err)
}
func (s *defaultManager) recordStatusFailureLocked(r *ResourceInfo, err error) {
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "")
for _, gvr := range gvrs {
s.recordErrorGroupResourceLocked(gvr.GroupResource(), err)
}
}
func (s *defaultManager) recordErrorGroupResourceLocked(gr schema.GroupResource, err error) {
if _, ok := s.managedStatus[gr]; !ok {
return
}
s.managedStatus[gr].lastErr = err
}
// PendingUpdate returns if the StorageVersion of a resource is still wait to be updated.
func (s *defaultManager) PendingUpdate(gr schema.GroupResource) bool {
s.mu.RLock()
defer s.mu.RUnlock()
if _, ok := s.managedStatus[gr]; !ok {
return false
}
return !s.managedStatus[gr].done
}
// LastUpdateError returns the last error hit when updating the storage version of the given resource.
func (s *defaultManager) LastUpdateError(gr schema.GroupResource) error {
s.mu.RLock()
defer s.mu.RUnlock()
if _, ok := s.managedStatus[gr]; !ok {
return fmt.Errorf("couldn't find managed status for %v", gr)
}
return s.managedStatus[gr].lastErr
}
// setComplete marks the completion of updating StorageVersions. No write requests need to be blocked anymore.
func (s *defaultManager) setComplete() {
s.completed.Store(true)
}
// Completed returns if updating StorageVersions has completed.
func (s *defaultManager) Completed() bool {
return s.completed.Load().(bool)
}
func decodableVersions(e runtime.EquivalentResourceRegistry, gr schema.GroupResource) []string {
var versions []string
decodingGVRs := e.EquivalentResourcesFor(gr.WithVersion(""), "")
for _, v := range decodingGVRs {
versions = append(versions, v.GroupVersion().String())
}
return versions
}

View File

@ -0,0 +1,122 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storageversion
import (
"context"
"fmt"
"time"
"k8s.io/api/apiserverinternal/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
)
// Client has the methods required to update the storage version.
type Client interface {
Create(context.Context, *v1alpha1.StorageVersion, metav1.CreateOptions) (*v1alpha1.StorageVersion, error)
Update(context.Context, *v1alpha1.StorageVersion, metav1.UpdateOptions) (*v1alpha1.StorageVersion, error)
Get(context.Context, string, metav1.GetOptions) (*v1alpha1.StorageVersion, error)
}
func setCommonEncodingVersion(sv *v1alpha1.StorageVersion) {
if len(sv.Status.StorageVersions) == 0 {
return
}
firstVersion := sv.Status.StorageVersions[0].EncodingVersion
agreed := true
for _, ssv := range sv.Status.StorageVersions {
if ssv.EncodingVersion != firstVersion {
agreed = false
break
}
}
if agreed {
sv.Status.CommonEncodingVersion = &firstVersion
} else {
sv.Status.CommonEncodingVersion = nil
}
}
// updateStorageVersionFor updates the storage version object for the resource.
func updateStorageVersionFor(c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string) error {
retries := 3
var retry int
var err error
for retry < retries {
err = singleUpdate(c, apiserverID, gr, encodingVersion, decodableVersions)
if err == nil {
return nil
}
if apierrors.IsAlreadyExists(err) || apierrors.IsConflict(err) {
time.Sleep(1 * time.Second)
continue
}
if err != nil {
klog.Errorf("retry %d, failed to update storage version for %v: %v", retry, gr, err)
retry++
time.Sleep(1 * time.Second)
}
}
return err
}
func singleUpdate(c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string) error {
shouldCreate := false
name := fmt.Sprintf("%s.%s", gr.Group, gr.Resource)
sv, err := c.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if apierrors.IsNotFound(err) {
shouldCreate = true
sv = &v1alpha1.StorageVersion{}
sv.ObjectMeta.Name = name
}
updatedSV := localUpdateStorageVersion(sv, apiserverID, encodingVersion, decodableVersions)
if shouldCreate {
_, err := c.Create(context.TODO(), updatedSV, metav1.CreateOptions{})
return err
}
_, err = c.Update(context.TODO(), updatedSV, metav1.UpdateOptions{})
return err
}
// localUpdateStorageVersion updates the input storageversion with given server storageversion info.
// The function updates the input storageversion in place.
func localUpdateStorageVersion(sv *v1alpha1.StorageVersion, apiserverID, encodingVersion string, decodableVersions []string) *v1alpha1.StorageVersion {
newSSV := v1alpha1.ServerStorageVersion{
APIServerID: apiserverID,
EncodingVersion: encodingVersion,
DecodableVersions: decodableVersions,
}
foundSSV := false
for i, ssv := range sv.Status.StorageVersions {
if ssv.APIServerID == apiserverID {
sv.Status.StorageVersions[i] = newSSV
foundSSV = true
break
}
}
if !foundSSV {
sv.Status.StorageVersions = append(sv.Status.StorageVersions, newSSV)
}
setCommonEncodingVersion(sv)
return sv
}

View File

@ -0,0 +1,102 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storageversion
import (
"reflect"
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/api/apiserverinternal/v1alpha1"
)
func TestLocalUpdateStorageVersion(t *testing.T) {
v1 := "v1"
ssv1 := v1alpha1.ServerStorageVersion{
APIServerID: "1",
EncodingVersion: "v1",
DecodableVersions: []string{"v1", "v2"},
}
ssv2 := v1alpha1.ServerStorageVersion{
APIServerID: "2",
EncodingVersion: "v1",
DecodableVersions: []string{"v1", "v2"},
}
// ssv3 has a different encoding version
ssv3 := v1alpha1.ServerStorageVersion{
APIServerID: "3",
EncodingVersion: "v2",
DecodableVersions: []string{"v1", "v2"},
}
ssv4 := v1alpha1.ServerStorageVersion{
APIServerID: "4",
EncodingVersion: "v1",
DecodableVersions: []string{"v1", "v2", "v4"},
}
tests := []struct {
old v1alpha1.StorageVersionStatus
newSSV v1alpha1.ServerStorageVersion
expected v1alpha1.StorageVersionStatus
}{
{
old: v1alpha1.StorageVersionStatus{},
newSSV: ssv1,
expected: v1alpha1.StorageVersionStatus{
StorageVersions: []v1alpha1.ServerStorageVersion{ssv1},
CommonEncodingVersion: &v1,
},
},
{
old: v1alpha1.StorageVersionStatus{
StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2},
CommonEncodingVersion: &v1,
},
newSSV: ssv3,
expected: v1alpha1.StorageVersionStatus{
StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3},
},
},
{
old: v1alpha1.StorageVersionStatus{
StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2},
CommonEncodingVersion: &v1,
},
newSSV: ssv4,
expected: v1alpha1.StorageVersionStatus{
StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv4},
CommonEncodingVersion: &v1,
},
},
{
old: v1alpha1.StorageVersionStatus{
StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3},
},
newSSV: ssv4,
expected: v1alpha1.StorageVersionStatus{
StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3, ssv4},
},
},
}
for _, tc := range tests {
sv := &v1alpha1.StorageVersion{Status: tc.old}
updated := localUpdateStorageVersion(sv, tc.newSSV.APIServerID, tc.newSSV.EncodingVersion, tc.newSSV.DecodableVersions)
if e, a := tc.expected, updated.Status; !reflect.DeepEqual(e, a) {
t.Errorf("unexpected: %v", cmp.Diff(e, a))
}
}
}