cacher: Add support for consistent streaming

design details https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details

Kubernetes-commit: 7c7e7733050c22ab855c1b1c717b3514b953503e
This commit is contained in:
Lukasz Szaszkiewicz 2023-02-27 13:32:38 +01:00 committed by Kubernetes Publisher
parent 5b2e0c750b
commit ff7564e339
2 changed files with 455 additions and 31 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"reflect"
"strconv"
"sync"
"time"
@ -34,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
@ -290,6 +290,9 @@ type Cacher struct {
// newFunc is a function that creates new empty object storing a object of type Type.
newFunc func() runtime.Object
// newListFunc is a function that creates new empty list for storing objects of type Type.
newListFunc func() runtime.Object
// indexedTrigger is used for optimizing amount of watchers that needs to process
// an incoming event.
indexedTrigger *indexedTriggerFunc
@ -371,6 +374,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
groupResource: config.GroupResource,
versioner: config.Versioner,
newFunc: config.NewFunc,
newListFunc: config.NewListFunc,
indexedTrigger: indexedTrigger,
watcherIdx: 0,
watchers: indexedWatchers{
@ -498,19 +502,18 @@ type namespacedName struct {
// Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
if opts.SendInitialEvents != nil {
return nil, errors.NewInvalid(
schema.GroupKind{Group: c.groupResource.Group, Kind: c.groupResource.Resource},
"",
field.ErrorList{field.Forbidden(field.NewPath("sendInitialEvents"), "for watch is not yet implemented by the watch cache")},
)
}
pred := opts.Predicate
// If the resourceVersion is unset, ensure that the rv
// from which the watch is being served, is the latest
// if the watch-list feature wasn't set and the resourceVersion is unset
// ensure that the rv from which the watch is being served, is the latest
// one. "latest" is ensured by serving the watch from
// the underlying storage.
if opts.ResourceVersion == "" {
//
// it should never happen due to our validation but let's just be super-safe here
// and disable sendingInitialEvents when the feature wasn't enabled
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
opts.SendInitialEvents = nil
}
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
@ -553,6 +556,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// watchers on our watcher having a processing hiccup
chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
// Determine a function that computes the bookmarkAfterResourceVersion
bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, watchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
// Determine a function that computes the watchRV we should start from
startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, watchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
// Determine watch timeout('0' means deadline is not set, ignore checking)
deadline, _ := ctx.Deadline()
@ -580,6 +595,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
watchRV = startWatchResourceVersionFn()
cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
@ -593,6 +609,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
defer c.Unlock()
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
// Update the bookMarkAfterResourceVersion
watcher.setBookmarkAfterResourceVersion(bookmarkAfterResourceVersionFn())
c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
// Add it to the queue only when the client support watch bookmarks.
@ -1165,6 +1183,88 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
return c.versioner.ParseResourceVersion(resourceVersion)
}
// getCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine.
// this method issues an empty list request and reads only the ResourceVersion from the object metadata
func (c *Cacher) getCurrentResourceVersionFromStorage(ctx context.Context) (uint64, error) {
if c.newListFunc == nil {
return 0, fmt.Errorf("newListFunction wasn't provided for %v", c.objectType)
}
emptyList := c.newListFunc()
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1, // just in case we actually hit something
}
err := c.storage.GetList(ctx, c.resourcePrefix, storage.ListOptions{Predicate: pred}, emptyList)
if err != nil {
return 0, err
}
emptyListAccessor, err := meta.ListAccessor(emptyList)
if err != nil {
return 0, err
}
if emptyListAccessor == nil {
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
}
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
if err != nil {
return 0, err
}
if currentResourceVersion == 0 {
return 0, fmt.Errorf("the current resource version must be greater than 0")
}
return uint64(currentResourceVersion), nil
}
// getBookmarkAfterResourceVersionLockedFunc returns a function that
// spits a ResourceVersion after which the bookmark event will be delivered.
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(ctx context.Context, parsedResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if opts.SendInitialEvents == nil || *opts.SendInitialEvents == false || !opts.Predicate.AllowWatchBookmarks {
return func() uint64 { return 0 }, nil
}
return c.getCommonResourceVersionLockedFunc(ctx, parsedResourceVersion, opts)
}
// getStartResourceVersionForWatchLockedFunc returns a function that
// spits a ResourceVersion the watch will be started from.
// Depending on the input parameters the semantics of the returned ResourceVersion are:
// - start at Exact (return parsedWatchResourceVersion)
// - start at Most Recent (return an RV from etcd)
// - start at Any (return the current watchCache's RV)
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if opts.SendInitialEvents == nil || *opts.SendInitialEvents == true {
return func() uint64 { return parsedWatchResourceVersion }, nil
}
return c.getCommonResourceVersionLockedFunc(ctx, parsedWatchResourceVersion, opts)
}
// getCommonResourceVersionLockedFunc a helper that simply computes a ResourceVersion
// based on the input parameters. Please examine callers of this method to get more context.
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
switch {
case len(opts.ResourceVersion) == 0:
rv, err := c.getCurrentResourceVersionFromStorage(ctx)
if err != nil {
return nil, err
}
return func() uint64 { return rv }, nil
case parsedWatchResourceVersion == 0:
// here we assume that watchCache locked is already held
return func() uint64 { return c.watchCache.resourceVersion }, nil
default:
return func() uint64 { return parsedWatchResourceVersion }, nil
}
}
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type cacherListerWatcher struct {
storage storage.Interface

View File

@ -27,6 +27,9 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/apitesting"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@ -41,8 +44,14 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
)
@ -76,6 +85,9 @@ func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
return strconv.ParseUint(version, 10, 64)
}
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
if len(resourceVersion) == 0 {
return 0, nil
}
return strconv.ParseUint(resourceVersion, 10, 64)
}
@ -111,7 +123,8 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
type dummyStorage struct {
sync.RWMutex
err error
err error
getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error
}
type dummyWatch struct {
@ -151,10 +164,12 @@ func (d *dummyStorage) Get(_ context.Context, _ string, _ storage.GetOptions, _
return d.err
}
func (d *dummyStorage) GetList(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
func (d *dummyStorage) GetList(ctx context.Context, resPrefix string, opts storage.ListOptions, listObj runtime.Object) error {
if d.getListFn != nil {
return d.getListFn(ctx, resPrefix, opts, listObj)
}
d.RLock()
defer d.RUnlock()
podList := listObj.(*example.PodList)
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
return d.err
@ -1082,28 +1097,68 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
}
}
func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event) {
func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event, strictOrder bool) {
_, _, line, _ := goruntime.Caller(1)
for _, expectedEvent := range events {
actualEvents := make([]watch.Event, len(events))
for idx := range events {
select {
case event := <-w.ResultChan():
if e, a := expectedEvent.Type, event.Type; e != a {
t.Logf("(called from line %d)", line)
t.Errorf("Expected: %s, got: %s", e, a)
}
object := event.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
if e, a := expectedEvent.Object, object; !apiequality.Semantic.DeepEqual(e, a) {
t.Logf("(called from line %d)", line)
t.Errorf("Expected: %#v, got: %#v", e, a)
}
actualEvents[idx] = event
case <-time.After(wait.ForeverTestTimeout):
t.Logf("(called from line %d)", line)
t.Errorf("Timed out waiting for an event")
}
}
validateEvents := func(expected, actual watch.Event) (bool, []string) {
errors := []string{}
if e, a := expected.Type, actual.Type; e != a {
errors = append(errors, fmt.Sprintf("Expected: %s, got: %s", e, a))
}
actualObject := actual.Object
if co, ok := actualObject.(runtime.CacheableObject); ok {
actualObject = co.GetObject()
}
if e, a := expected.Object, actualObject; !apiequality.Semantic.DeepEqual(e, a) {
errors = append(errors, fmt.Sprintf("Expected: %#v, got: %#v", e, a))
}
return len(errors) == 0, errors
}
if len(events) != len(actualEvents) {
t.Fatalf("unexpected number of events: %d, expected: %d, acutalEvents: %#v, expectedEvents:%#v", len(actualEvents), len(events), actualEvents, events)
}
if strictOrder {
for idx, expectedEvent := range events {
valid, errors := validateEvents(expectedEvent, actualEvents[idx])
if !valid {
t.Logf("(called from line %d)", line)
for _, err := range errors {
t.Errorf(err)
}
}
}
}
for _, expectedEvent := range events {
validated := false
for _, actualEvent := range actualEvents {
if validated, _ = validateEvents(expectedEvent, actualEvent); validated {
break
}
}
if !validated {
t.Fatalf("Expected: %#v but didn't find", expectedEvent)
}
}
}
func verifyNoEvents(t *testing.T, w watch.Interface) {
select {
case e := <-w.ResultChan():
t.Errorf("Unexpected: %#v event received, expected no events", e)
case <-time.After(time.Second):
return
}
}
func TestCachingDeleteEvents(t *testing.T) {
@ -1183,9 +1238,9 @@ func TestCachingDeleteEvents(t *testing.T) {
cacher.watchCache.Update(pod3)
cacher.watchCache.Delete(pod4)
verifyEvents(t, allEventsWatcher, allEvents)
verifyEvents(t, fooEventsWatcher, fooEvents)
verifyEvents(t, barEventsWatcher, barEvents)
verifyEvents(t, allEventsWatcher, allEvents, true)
verifyEvents(t, fooEventsWatcher, fooEvents, true)
verifyEvents(t, barEventsWatcher, barEvents, true)
}
func testCachingObjects(t *testing.T, watchersCount int) {
@ -1367,3 +1422,272 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received)
}
}
func TestCacherWatchSemantics(t *testing.T) {
trueVal, falseVal := true, false
makePod := func(rv uint64) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", rv),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", rv),
Annotations: map[string]string{},
},
}
}
scenarios := []struct {
name string
allowWatchBookmarks bool
sendInitialEvents *bool
resourceVersion string
storageResourceVersion string
initialPods []*example.Pod
podsAfterEstablishingWatch []*example.Pod
expectedInitialEventsInStrictOrder []watch.Event
expectedInitialEventsInRandomOrder []watch.Event
expectedEventsAfterEstablishingWatch []watch.Event
}{
{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset, storageRV=102",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
storageResourceVersion: "102",
initialPods: []*example.Pod{makePod(101)},
podsAfterEstablishingWatch: []*example.Pod{makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}},
expectedEventsAfterEstablishingWatch: []watch.Event{
{Type: watch.Added, Object: makePod(102)},
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
},
},
{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0, storageRV=105",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "0",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{
{Type: watch.Added, Object: makePod(101)},
{Type: watch.Added, Object: makePod(102)},
},
expectedInitialEventsInStrictOrder: []watch.Event{
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
},
},
{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=101, storageRV=105",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "101",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
expectedInitialEventsInStrictOrder: []watch.Event{
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
},
},
{
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset, storageRV=102",
sendInitialEvents: &trueVal,
storageResourceVersion: "102",
initialPods: []*example.Pod{makePod(101)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}},
podsAfterEstablishingWatch: []*example.Pod{makePod(102)},
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
},
{
// note we set storage's RV to some future value, mustn't be used by this scenario
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=0, storageRV=105",
sendInitialEvents: &trueVal,
resourceVersion: "0",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
},
{
// note we set storage's RV to some future value, mustn't be used by this scenario
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=101, storageRV=105",
sendInitialEvents: &trueVal,
resourceVersion: "101",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
// make sure we only get initial events that are > initial RV (101)
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
},
{
name: "sendInitialEvents=false, RV=unset, storageRV=103",
sendInitialEvents: &falseVal,
storageResourceVersion: "103",
initialPods: []*example.Pod{makePod(101), makePod(102)},
podsAfterEstablishingWatch: []*example.Pod{makePod(104)},
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(104)}},
},
{
// note we set storage's RV to some future value, mustn't be used by this scenario
name: "sendInitialEvents=false, RV=0, storageRV=105",
sendInitialEvents: &falseVal,
resourceVersion: "0",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
podsAfterEstablishingWatch: []*example.Pod{makePod(103)},
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(103)}},
},
{
// note we set storage's RV to some future value, mustn't be used by this scenario
name: "legacy, RV=0, storageRV=105",
resourceVersion: "0",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
},
{
// note we set storage's RV to some future value, mustn't be used by this scenario
name: "legacy, RV=unset, storageRV=105",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
// no events because the watch is delegated to the underlying storage
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
// set up env
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
storageListMetaResourceVersion := ""
backingStorage := &dummyStorage{getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
podList := listObj.(*example.PodList)
podList.ListMeta = metav1.ListMeta{ResourceVersion: storageListMetaResourceVersion}
return nil
}}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("falied to create cacher: %v", err)
}
defer cacher.Stop()
if err := cacher.ready.wait(context.TODO()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// now, run a scenario
// but first let's add some initial data
for _, obj := range scenario.initialPods {
err = cacher.watchCache.Add(obj)
require.NoError(t, err, "failed to add a pod: %v")
}
// read request params
opts := storage.ListOptions{Predicate: storage.Everything}
opts.SendInitialEvents = scenario.sendInitialEvents
opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks
if len(scenario.resourceVersion) > 0 {
opts.ResourceVersion = scenario.resourceVersion
}
// before starting a new watch set a storage RV to some future value
storageListMetaResourceVersion = scenario.storageResourceVersion
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()
// make sure we only get initial events
verifyEvents(t, w, scenario.expectedInitialEventsInRandomOrder, false)
verifyEvents(t, w, scenario.expectedInitialEventsInStrictOrder, true)
verifyNoEvents(t, w)
// add a pod that is greater than the storage's RV when the watch was started
for _, obj := range scenario.podsAfterEstablishingWatch {
err = cacher.watchCache.Add(obj)
require.NoError(t, err, "failed to add a pod: %v")
}
verifyEvents(t, w, scenario.expectedEventsAfterEstablishingWatch, true)
verifyNoEvents(t, w)
})
}
}
func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
// test data
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, example.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
return server, storage
}
server, etcdStorage := newEtcdTestStorage(t, "")
defer server.Terminate(t)
podCacher, versioner, err := newTestCacher(etcdStorage)
if err != nil {
t.Fatalf("Couldn't create podCacher: %v", err)
}
defer podCacher.Stop()
makePod := func(name string) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
}
}
createPod := func(obj *example.Pod) *example.Pod {
key := "pods/" + obj.Namespace + "/" + obj.Name
out := &example.Pod{}
err := etcdStorage.Create(context.TODO(), key, obj, out, 0)
require.NoError(t, err)
return out
}
getPod := func(name, ns string) *example.Pod {
key := "pods/" + ns + "/" + name
out := &example.Pod{}
err := etcdStorage.Get(context.TODO(), key, storage.GetOptions{}, out)
require.NoError(t, err)
return out
}
makeReplicaSet := func(name string) *example.ReplicaSet {
return &example.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
}
}
createReplicaSet := func(obj *example.ReplicaSet) *example.ReplicaSet {
key := "replicasets/" + obj.Namespace + "/" + obj.Name
out := &example.ReplicaSet{}
err := etcdStorage.Create(context.TODO(), key, obj, out, 0)
require.NoError(t, err)
return out
}
// create a pod and make sure its RV is equal to the one maintained by etcd
pod := createPod(makePod("pod-1"))
currentStorageRV, err := podCacher.getCurrentResourceVersionFromStorage(context.TODO())
require.NoError(t, err)
podRV, err := versioner.ParseResourceVersion(pod.ResourceVersion)
require.NoError(t, err)
require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV")
// now create a replicaset (new resource) and make sure the target function returns global etcd RV
rs := createReplicaSet(makeReplicaSet("replicaset-1"))
currentStorageRV, err = podCacher.getCurrentResourceVersionFromStorage(context.TODO())
require.NoError(t, err)
rsRV, err := versioner.ParseResourceVersion(rs.ResourceVersion)
require.NoError(t, err)
require.Equal(t, currentStorageRV, rsRV, "expected the global etcd RV to be equal to replicaset's RV")
// ensure that the pod's RV hasn't been changed
currentPod := getPod(pod.Name, pod.Namespace)
currentPodRV, err := versioner.ParseResourceVersion(currentPod.ResourceVersion)
require.NoError(t, err)
require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed")
}