resource version parsing should all be in one place

Kubernetes-commit: 023895d597be6539a1a16fa54d60e47a17d85dc1
This commit is contained in:
Daniel Smith 2018-01-10 10:43:59 -08:00 committed by Kubernetes Publisher
parent ceea762f7c
commit 4b163fbe32
12 changed files with 134 additions and 108 deletions

View File

@ -20,7 +20,6 @@ import (
"fmt"
"net/http"
"reflect"
"strconv"
"sync"
"time"
@ -290,7 +289,7 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre
// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
watchRV, err := ParseWatchResourceVersion(resourceVersion)
watchRV, err := c.versioner.ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
@ -361,7 +360,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
getRV, err := ParseListResourceVersion(resourceVersion)
getRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
if err != nil {
return err
}
@ -414,7 +413,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := ParseListResourceVersion(resourceVersion)
listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
if err != nil {
return err
}
@ -483,7 +482,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := ParseListResourceVersion(resourceVersion)
listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
if err != nil {
return err
}
@ -711,11 +710,7 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
c.ready.wait()
resourceVersion := c.reflector.LastSyncResourceVersion()
if resourceVersion == "" {
return 0, nil
}
return strconv.ParseUint(resourceVersion, 10, 64)
return c.versioner.ParseListResourceVersion(resourceVersion)
}
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.

View File

@ -203,3 +203,9 @@ func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
return 0, fmt.Errorf("unimplemented")
}
func (testVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64)
}
func (testVersioner) ParseListResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64)
}

View File

@ -58,6 +58,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/cache:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library",

View File

@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/storage"
)
@ -81,6 +82,44 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e
return strconv.ParseUint(version, 10, 64)
}
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func (a APIObjectVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, storage.NewInvalidError(field.ErrorList{
// Validation errors are supposed to return version-specific field
// paths, but this is probably close enough.
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
})
}
return version, nil
}
// ParseListResourceVersion takes a resource version argument and converts it to
// the etcd version.
// TODO: reevaluate whether it is really clearer to have both this and the
// Watch version of this function, since they perform the same logic.
func (a APIObjectVersioner) ParseListResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, storage.NewInvalidError(field.ErrorList{
// Validation errors are supposed to return version-specific field
// paths, but this is probably close enough.
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
})
}
return version, nil
}
// APIObjectVersioner implements Versioner
var Versioner storage.Versioner = APIObjectVersioner{}

View File

@ -20,6 +20,7 @@ import (
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/storage"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
)
@ -40,6 +41,43 @@ func TestObjectVersioner(t *testing.T) {
}
}
func TestEtcdParseResourceVersion(t *testing.T) {
testCases := []struct {
Version string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 1},
{Version: "10", ExpectVersion: 10},
}
v := APIObjectVersioner{}
testFuncs := []func(string) (uint64, error){
v.ParseListResourceVersion,
v.ParseWatchResourceVersion,
}
for _, testCase := range testCases {
for i, f := range testFuncs {
version, err := f(testCase.Version)
switch {
case testCase.Err && err == nil:
t.Errorf("%s[%v]: unexpected non-error", testCase.Version, i)
case testCase.Err && !storage.IsInvalidError(err):
t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err)
case !testCase.Err && err != nil:
t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err)
}
if version != testCase.ExpectVersion {
t.Errorf("%s[%v]: expected version %d but was %d", testCase.Version, i, testCase.ExpectVersion, version)
}
}
}
}
func TestCompareResourceVersion(t *testing.T) {
five := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}
six := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}

View File

@ -235,7 +235,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
if ctx == nil {
glog.Errorf("Context is nil")
}
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
@ -250,7 +250,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
if ctx == nil {
glog.Errorf("Context is nil")
}
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}

View File

@ -24,7 +24,6 @@ import (
"fmt"
"path"
"reflect"
"strconv"
"strings"
"time"
@ -524,14 +523,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
case s.pagingEnabled && pred.Limit > 0:
if len(resourceVersion) > 0 {
fromRV, err := strconv.ParseInt(resourceVersion, 10, 64)
fromRV, err := s.versioner.ParseListResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(fromRV))
options = append(options, clientv3.WithRev(int64(fromRV)))
}
returnedRV = fromRV
returnedRV = int64(fromRV)
}
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
@ -539,14 +538,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
default:
if len(resourceVersion) > 0 {
fromRV, err := strconv.ParseInt(resourceVersion, 10, 64)
fromRV, err := s.versioner.ParseListResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(fromRV))
options = append(options, clientv3.WithRev(int64(fromRV)))
}
returnedRV = fromRV
returnedRV = int64(fromRV)
}
options = append(options, clientv3.WithPrefix())
@ -666,7 +665,7 @@ func (s *store) WatchList(ctx context.Context, key string, resourceVersion strin
}
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
rev, err := storage.ParseWatchResourceVersion(rv)
rev, err := s.versioner.ParseWatchResourceVersion(rv)
if err != nil {
return nil, err
}

View File

@ -19,7 +19,6 @@ package etcd3
import (
"fmt"
"reflect"
"strconv"
"sync"
"testing"
"time"
@ -186,7 +185,7 @@ func TestWatchFromZero(t *testing.T) {
}
// Compact previous versions
revToCompact, err := strconv.Atoi(out.ResourceVersion)
revToCompact, err := store.versioner.ParseListResourceVersion(out.ResourceVersion)
if err != nil {
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
}
@ -305,7 +304,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
var wres clientv3.WatchResponse
wres = <-etcdW
watchedDeleteRev, err := storage.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion)
watchedDeleteRev, err := store.versioner.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion)
if err != nil {
t.Fatalf("ParseWatchResourceVersion failed: %v", err)
}

View File

@ -28,7 +28,9 @@ import (
// Versioner abstracts setting and retrieving metadata fields from database response
// onto the object ot list. It is required to maintain storage invariants - updating an
// object twice with the same data except for the ResourceVersion and SelfLink must be
// a no-op.
// a no-op. A resourceVersion of type uint64 is a 'raw' resourceVersion,
// intended to be sent directly to or from the backend. A resourceVersion of
// type string is a 'safe' resourceVersion, intended for consumption by users.
type Versioner interface {
// UpdateObject sets storage metadata into an API object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata
@ -45,6 +47,17 @@ type Versioner interface {
// ObjectResourceVersion returns the resource version (for persistence) of the specified object.
// Should return an error if the specified object does not have a persistable version.
ObjectResourceVersion(obj runtime.Object) (uint64, error)
// ParseWatchResourceVersion takes a resource version argument and
// converts it to the storage backend we should pass to helper.Watch().
// Because resourceVersion is an opaque value, the default watch
// behavior for non-zero watch is to watch the next value (if you pass
// "1", you will see updates from "2" onwards).
ParseWatchResourceVersion(resourceVersion string) (uint64, error)
// ParseListResourceVersion takes a resource version argument and
// converts it to the storage backend version. Appropriate for
// everything that's not intended as an argument for watch.
ParseListResourceVersion(resourceVersion string) (uint64, error)
}
// ResponseMeta contains information about the database metadata that is associated with

View File

@ -97,12 +97,13 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServe
return server, storage
}
func newTestCacher(s storage.Interface, cap int) *storage.Cacher {
func newTestCacher(s storage.Interface, cap int) (*storage.Cacher, storage.Versioner) {
prefix := "pods"
v := etcdstorage.APIObjectVersioner{}
config := storage.CacherConfig{
CacheCapacity: cap,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
Versioner: v,
Type: &example.Pod{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
@ -110,7 +111,7 @@ func newTestCacher(s storage.Interface, cap int) *storage.Cacher {
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
}
return storage.NewCacherFromConfig(config)
return storage.NewCacherFromConfig(config), v
}
func makeTestPod(name string) *example.Pod {
@ -139,7 +140,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl
func TestGet(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
podFoo := makeTestPod("foo")
@ -170,7 +171,7 @@ func TestGet(t *testing.T) {
func TestList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
podFoo := makeTestPod("foo")
@ -251,14 +252,14 @@ func TestList(t *testing.T) {
func TestInfiniteList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
podFoo := makeTestPod("foo")
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
// Set up List at fooCreated.ResourceVersion + 10
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -307,7 +308,7 @@ func TestWatch(t *testing.T) {
// Inject one list error to make sure we test the relist case.
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error
cacher, _ := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error
defer cacher.Stop()
podFoo := makeTestPod("foo")
@ -382,7 +383,7 @@ func TestWatch(t *testing.T) {
func TestWatcherTimeout(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
// initialVersion is used to initate the watcher at the beginning of the world,
@ -424,7 +425,7 @@ func TestWatcherTimeout(t *testing.T) {
func TestFiltering(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
// Ensure that the cacher is initialized, before creating any pods,
@ -486,7 +487,7 @@ func TestFiltering(t *testing.T) {
func TestStartingResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
// add 1 object
@ -494,7 +495,7 @@ func TestStartingResourceVersion(t *testing.T) {
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
// Set up Watch starting at fooCreated.ResourceVersion + 10
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -517,7 +518,7 @@ func TestStartingResourceVersion(t *testing.T) {
select {
case e := <-watcher.ResultChan():
pod := e.Object.(*example.Pod)
podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion)
podRV, err := v.ParseWatchResourceVersion(pod.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -544,15 +545,15 @@ func TestEmptyWatchEventCache(t *testing.T) {
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
// get rv of last pod created
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
// We now have a cacher with an empty cache of watch events and a resourceVersion of rv.
// It should support establishing watches from rv and higher, but not older.
@ -598,11 +599,11 @@ func TestEmptyWatchEventCache(t *testing.T) {
func TestRandomWatchDeliver(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

View File

@ -18,14 +18,12 @@ package storage
import (
"fmt"
"strconv"
"strings"
"sync/atomic"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/validation/path"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
)
type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error)
@ -50,35 +48,6 @@ func NoTriggerPublisher(runtime.Object) []MatchValue {
return nil
}
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, NewInvalidError(field.ErrorList{
// Validation errors are supposed to return version-specific field
// paths, but this is probably close enough.
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
})
}
return version, nil
}
// ParseListResourceVersion takes a resource version argument and converts it to
// the etcd version.
func ParseListResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
return version, err
}
func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
meta, err := meta.Accessor(obj)
if err != nil {

View File

@ -22,40 +22,6 @@ import (
"testing"
)
func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct {
Version string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 1},
{Version: "10", ExpectVersion: 10},
}
for _, testCase := range testCases {
version, err := ParseWatchResourceVersion(testCase.Version)
switch {
case testCase.Err:
if err == nil {
t.Errorf("%s: unexpected non-error", testCase.Version)
continue
}
if !IsInvalidError(err) {
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
case !testCase.Err && err != nil:
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
if version != testCase.ExpectVersion {
t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version)
}
}
}
func TestHasPathPrefix(t *testing.T) {
validTestcases := []struct {
s string