Generalize watch storage tests

Kubernetes-commit: 8266c4d934d42a5175a84bff10fda4bf36f13817
This commit is contained in:
Wojciech Tyczyński 2023-06-27 18:12:30 +02:00 committed by Kubernetes Publisher
parent 122f3d90e6
commit b560936651
5 changed files with 85 additions and 173 deletions

View File

@ -19,13 +19,9 @@ package cacher
import (
"context"
"fmt"
goruntime "runtime"
"strconv"
"testing"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
@ -33,19 +29,12 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
)
const (
// watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity.
watchCacheDefaultCapacity = 100
)
func init() {
@ -85,41 +74,6 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
return source
}
func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*Cacher, storage.Versioner, error) {
prefix := "pods"
v := storage.APIObjectVersioner{}
config := Config{
Storage: s,
Versioner: v,
GroupResource: schema.GroupResource{Resource: "pods"},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: GetPodAttrs,
NewFunc: newPod,
NewListFunc: newPodList,
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: clock,
}
cacher, err := NewCacherFromConfig(config)
return cacher, v, err
}
func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod {
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
return obj.DeepCopyObject(), nil, nil
}
key := "pods/" + obj.Namespace + "/" + obj.Name
if err := s.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, old == nil, nil, updateFn, nil); err != nil {
t.Errorf("unexpected error: %v", err)
}
obj.ResourceVersion = ""
result := &example.Pod{}
if err := s.Get(context.TODO(), key, storage.GetOptions{}, result); err != nil {
t.Errorf("unexpected error: %v", err)
}
return result
}
func checkStorageInvariants(ctx context.Context, t *testing.T, key string) {
// No-op function since cacher simply passes object creation to the underlying storage.
}
@ -231,7 +185,10 @@ func TestListContinuationWithFilter(t *testing.T) {
}
func TestListInconsistentContinuation(t *testing.T) {
// TODO(#109831): Enable use of this test and run it.
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
// TODO(#109831): Enable use of this by setting compaction.
storagetesting.RunTestListInconsistentContinuation(ctx, t, cacher, nil)
}
func TestConsistentList(t *testing.T) {
@ -274,28 +231,6 @@ func TestCount(t *testing.T) {
storagetesting.RunTestCount(ctx, t, cacher)
}
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
_, _, line, _ := goruntime.Caller(1)
select {
case event := <-w.ResultChan():
if e, a := eventType, event.Type; e != a {
t.Logf("(called from line %d)", line)
t.Errorf("Expected: %s, got: %s", eventType, event.Type)
}
object := event.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
if e, a := eventObject, object; !apiequality.Semantic.DeepDerivative(e, a) {
t.Logf("(called from line %d)", line)
t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
}
case <-time.After(wait.ForeverTestTimeout):
t.Logf("(called from line %d)", line)
t.Errorf("Timed out waiting for an event")
}
}
func TestWatch(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
@ -305,7 +240,7 @@ func TestWatch(t *testing.T) {
func TestWatchFromZero(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchFromZero(ctx, t, cacher, nil)
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher))
}
func TestDeleteTriggerWatch(t *testing.T) {
@ -364,96 +299,6 @@ func TestNamespaceScopedWatch(t *testing.T) {
storagetesting.RunTestNamespaceScopedWatch(ctx, t, cacher)
}
// TODO(wojtek-t): We should extend the generic RunTestWatch test to cover the
// scenarios that are not yet covered by it and get rid of this test.
func TestWatchDeprecated(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true)
defer server.Terminate(t)
fakeClock := testingclock.NewFakeClock(time.Now())
cacher, _, err := newTestCacherWithClock(etcdStorage, fakeClock)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
podFoo := makeTestPodWithName("foo")
podBar := makeTestPodWithName("bar")
podFooPrime := makeTestPodWithName("foo")
podFooPrime.Spec.NodeName = "fakeNode"
podFooBis := makeTestPodWithName("foo")
podFooBis.Spec.NodeName = "anotherFakeNode"
podFooNS2 := makeTestPodWithName("foo")
podFooNS2.Namespace += "2"
// initialVersion is used to initate the watcher at the beginning of the world,
// which is not defined precisely in etcd.
initialVersion, err := cacher.LastSyncResourceVersion()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
startVersion := strconv.Itoa(int(initialVersion))
// Set up Watch for object "podFoo".
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
// Create in another namespace first to make sure events from other namespaces don't get delivered
updatePod(t, etcdStorage, podFooNS2, nil)
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
_ = updatePod(t, etcdStorage, podBar, nil)
fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer initialWatcher.Stop()
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
// Now test watch from "now".
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nowWatcher.Stop()
verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime)
_ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
// Add watchCacheDefaultCapacity events to make current watch cache full.
// Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand.
for i := 0; i < watchCacheDefaultCapacity; i++ {
fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute))
podFoo := makeTestPodWithName(fmt.Sprintf("foo-%d", i))
updatePod(t, etcdStorage, podFoo, nil)
}
// Check whether we get too-old error via the watch channel
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Expected no direct error, got %v", err)
}
defer tooOldWatcher.Stop()
// Ensure we get a "Gone" error.
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
}
func TestWatchDispatchBookmarkEvents(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)

View File

@ -17,6 +17,7 @@ limitations under the License.
package cacher
import (
"context"
"fmt"
"testing"
@ -66,13 +67,53 @@ func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3
return server, storage
}
func makeTestPodWithName(name string) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
Spec: storagetesting.DeepEqualSafePodSpec(),
}
}
func computePodKey(obj *example.Pod) string {
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
}
func compactStorage(c *Cacher) storagetesting.Compaction {
return func(ctx context.Context, t *testing.T, resourceVersion string) {
versioner := storage.APIObjectVersioner{}
rv, err := versioner.ParseResourceVersion(resourceVersion)
if err != nil {
t.Fatal(err)
}
err = c.watchCache.waitUntilFreshAndBlock(context.TODO(), rv)
if err != nil {
t.Fatalf("WatchCache didn't caught up to RV: %v", rv)
}
c.watchCache.RUnlock()
c.watchCache.Lock()
defer c.watchCache.Unlock()
c.Lock()
defer c.Unlock()
if c.watchCache.resourceVersion < rv {
t.Fatalf("Can't compact into a future version: %v", resourceVersion)
}
if rv < c.watchCache.listResourceVersion {
t.Fatalf("Can't compact into a past version: %v", resourceVersion)
}
if len(c.watchers.allWatchers) > 0 || len(c.watchers.valueWatchers) > 0 {
// We could consider terminating those watchers, but given
// watchcache doesn't really support compaction and we don't
// exercise it in tests, we just throw an error here.
t.Error("Open watchers are not supported during compaction")
}
for c.watchCache.startIndex < c.watchCache.endIndex {
index := c.watchCache.startIndex % c.watchCache.capacity
if c.watchCache.cache[index].ResourceVersion > rv {
break
}
c.watchCache.startIndex++
}
c.watchCache.listResourceVersion = rv
// TODO(wojtek-t): We should also compact the underlying etcd storage.
}
}

View File

@ -305,7 +305,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
if err := func() error {
// TODO: We should consider moving this lock below after the watchCacheEvent
// is created. In such situation, the only problematic scenario is Replace(
// is created. In such situation, the only problematic scenario is Replace()
// happening after getting object from store and before acquiring a lock.
// Maybe introduce another lock for this purpose.
w.Lock()

View File

@ -150,7 +150,7 @@ func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.I
}
}
func testCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) {
func testCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj runtime.Object) {
testCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error {
ExpectNoDiff(t, "incorrect object", expectObj, object)
return nil

View File

@ -19,12 +19,15 @@ package testing
import (
"context"
"fmt"
"net/http"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
@ -238,8 +241,8 @@ func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Inter
}
// Update again
out = &example.Pod{}
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
newOut := &example.Pod{}
err = store.GuaranteedUpdate(ctx, key, newOut, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}, nil
}), nil)
@ -248,14 +251,37 @@ func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Inter
}
// Compact previous versions
compaction(ctx, t, out.ResourceVersion)
compaction(ctx, t, newOut.ResourceVersion)
// Make sure we can still watch from 0 and receive an ADDED event
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
defer w.Stop()
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
testCheckResult(t, watch.Added, w, out)
testCheckResult(t, watch.Added, w, newOut)
// Make sure we can't watch from older resource versions anymoer and get a "Gone" error.
tooOldWatcher, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
defer tooOldWatcher.Stop()
expiredError := errors.NewResourceExpired("").ErrStatus
// TODO(wojtek-t): It seems that etcd is currently returning a different error,
// being an Internal error of "etcd event received with PrevKv=nil".
// We temporary allow both but we should unify here.
internalError := metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusInternalServerError,
Reason: metav1.StatusReasonInternalError,
}
testCheckResultFunc(t, watch.Error, tooOldWatcher, func(obj runtime.Object) error {
if !apiequality.Semantic.DeepDerivative(&expiredError, obj) && !apiequality.Semantic.DeepDerivative(&internalError, obj) {
t.Errorf("expected: %#v; got %#v", &expiredError, obj)
}
return nil
})
}
func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) {