Merge pull request #108414 from aojea/cacher_context

cacher: don't accept requests if stopped

Kubernetes-commit: 999b1bbe92947fca9e1fab67349ad11d3c46d328
This commit is contained in:
Kubernetes Publisher 2022-05-11 10:47:02 -07:00
commit 5f27f61940
6 changed files with 315 additions and 68 deletions

16
go.mod
View File

@ -38,10 +38,10 @@ require (
google.golang.org/grpc v1.44.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/square/go-jose.v2 v2.2.2
k8s.io/api v0.0.0-20220509182417-9b88471ea2b1
k8s.io/apimachinery v0.0.0-20220509181918-47789511e916
k8s.io/client-go v0.0.0-20220509183031-24209264f769
k8s.io/component-base v0.0.0-20220509184231-9da133a032bf
k8s.io/api v0.0.0-20220510154143-ae35a85329f1
k8s.io/apimachinery v0.0.0-20220511125320-f3b1305f4010
k8s.io/client-go v0.0.0-20220511125810-77f63643f951
k8s.io/component-base v0.0.0-20220510143133-ee36a6a1367e
k8s.io/klog/v2 v2.60.1
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
@ -118,8 +118,8 @@ require (
)
replace (
k8s.io/api => k8s.io/api v0.0.0-20220509182417-9b88471ea2b1
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220509181918-47789511e916
k8s.io/client-go => k8s.io/client-go v0.0.0-20220509183031-24209264f769
k8s.io/component-base => k8s.io/component-base v0.0.0-20220509184231-9da133a032bf
k8s.io/api => k8s.io/api v0.0.0-20220510154143-ae35a85329f1
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220511125320-f3b1305f4010
k8s.io/client-go => k8s.io/client-go v0.0.0-20220511125810-77f63643f951
k8s.io/component-base => k8s.io/component-base v0.0.0-20220510143133-ee36a6a1367e
)

16
go.sum
View File

@ -843,14 +843,14 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20220509182417-9b88471ea2b1 h1:2vETXw+UU6pGtU9a85nwX2OUHDvup3JKPZ7bIAx3fBY=
k8s.io/api v0.0.0-20220509182417-9b88471ea2b1/go.mod h1:sZiCpVOT/SE/nrwAAwOLnekwXoB+cNDNfNMmYNckG3k=
k8s.io/apimachinery v0.0.0-20220509181918-47789511e916 h1:EK7ESaLKImLW0V+N2iBxxPqBHXANWPbjBnUs880QPGE=
k8s.io/apimachinery v0.0.0-20220509181918-47789511e916/go.mod h1:1oBVxgNUfLl978lJAlywA+H45m2ctSuqJU2stpbcjT4=
k8s.io/client-go v0.0.0-20220509183031-24209264f769 h1:MW8ja2H89n33gMx5yvgW/LvnujVYEiLrjJbKr+1rgwo=
k8s.io/client-go v0.0.0-20220509183031-24209264f769/go.mod h1:LERxHwMnv6pzVnFf3me521m0kSUunRTWuBs6wxTn9DI=
k8s.io/component-base v0.0.0-20220509184231-9da133a032bf h1:0TuYOEm7trLhWVadF11I12U++mIHr+02kgJBe8+SUdc=
k8s.io/component-base v0.0.0-20220509184231-9da133a032bf/go.mod h1:rgm2aW3vatuxl3Drh0QYdQbmlSaOsj9RdFa+GeDIZyg=
k8s.io/api v0.0.0-20220510154143-ae35a85329f1 h1:yZN3QEsAnbRVMdS2+kSdD4NY7M23TUKydf8rAkD+FLA=
k8s.io/api v0.0.0-20220510154143-ae35a85329f1/go.mod h1:sZiCpVOT/SE/nrwAAwOLnekwXoB+cNDNfNMmYNckG3k=
k8s.io/apimachinery v0.0.0-20220511125320-f3b1305f4010 h1:UDqeN/6SuN6isZIYNS8S4p1Dhd4uxH0eR18ntskjI4s=
k8s.io/apimachinery v0.0.0-20220511125320-f3b1305f4010/go.mod h1:1oBVxgNUfLl978lJAlywA+H45m2ctSuqJU2stpbcjT4=
k8s.io/client-go v0.0.0-20220511125810-77f63643f951 h1:CI8FWQypSXMEVUrNTuCxHPyG/19SX0AB6ihaqrZGpGg=
k8s.io/client-go v0.0.0-20220511125810-77f63643f951/go.mod h1:awH00PAhyUA+Xv39TCA9oXHusorGYZNpVRvC0pcRcAE=
k8s.io/component-base v0.0.0-20220510143133-ee36a6a1367e h1:vkKEhffaC9iR/Ql71tIJz7IqPlSG2nLxDA2IdFgjtZc=
k8s.io/component-base v0.0.0-20220510143133-ee36a6a1367e/go.mod h1:rgm2aW3vatuxl3Drh0QYdQbmlSaOsj9RdFa+GeDIZyg=
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=

View File

@ -462,7 +462,9 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return nil, err
}
c.ready.wait()
if err := c.ready.wait(); err != nil {
return nil, errors.NewServiceUnavailable(err.Error())
}
triggerValue, triggerSupported := "", false
if c.indexedTrigger != nil {
@ -559,7 +561,9 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
c.ready.wait()
if err := c.ready.wait(); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
objVal, err := conversion.EnforcePtr(objPtr)
if err != nil {
@ -644,7 +648,9 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
trace := utiltrace.New("cacher list", utiltrace.Field{Key: "type", Value: c.objectType.String()})
defer trace.LogIfLong(500 * time.Millisecond)
c.ready.wait()
if err := c.ready.wait(); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
trace.Step("Ready")
// List elements with at least 'listRV' from cache.
@ -1011,6 +1017,7 @@ func (c *Cacher) Stop() {
return
}
c.stopped = true
c.ready.stop()
c.stopLock.Unlock()
close(c.stopCh)
c.stopWg.Wait()
@ -1040,7 +1047,9 @@ func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWit
// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
c.ready.wait()
if err := c.ready.wait(); err != nil {
return 0, errors.NewServiceUnavailable(err.Error())
}
resourceVersion := c.reflector.LastSyncResourceVersion()
return c.versioner.ParseResourceVersion(resourceVersion)
@ -1426,36 +1435,3 @@ func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
}
}
}
type ready struct {
ok bool
c *sync.Cond
}
func newReady() *ready {
return &ready{c: sync.NewCond(&sync.RWMutex{})}
}
func (r *ready) wait() {
r.c.L.Lock()
for !r.ok {
r.c.Wait()
}
r.c.L.Unlock()
}
// TODO: Make check() function more sophisticated, in particular
// allow it to behave as "waitWithTimeout".
func (r *ready) check() bool {
rwMutex := r.c.L.(*sync.RWMutex)
rwMutex.RLock()
defer rwMutex.RUnlock()
return r.ok
}
func (r *ready) set(ok bool) {
r.c.L.Lock()
defer r.c.L.Unlock()
r.ok = ok
r.c.Broadcast()
}

View File

@ -337,7 +337,9 @@ func TestGetListCacheBypass(t *testing.T) {
result := &example.PodList{}
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
@ -374,7 +376,9 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
result := &example.PodList{}
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
@ -406,7 +410,9 @@ func TestGetCacheBypass(t *testing.T) {
result := &example.Pod{}
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
@ -436,7 +442,9 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Ensure there is some budget for slowing down processing.
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
@ -561,7 +569,9 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
@ -591,6 +601,68 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
}
func TestCacheDontAcceptRequestsStopped(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
watchClosed := make(chan struct{})
go func() {
defer close(watchClosed)
for event := range w.ResultChan() {
switch event.Type {
case watch.Added, watch.Modified, watch.Deleted:
// ok
default:
t.Errorf("unexpected event %#v", event)
}
}
}()
cacher.Stop()
_, err = cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err == nil {
t.Fatalf("Success to create Watch: %v", err)
}
result := &example.Pod{}
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
IgnoreNotFound: true,
ResourceVersion: "1",
}, result)
if err == nil {
t.Fatalf("Success to create Get: %v", err)
}
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "1",
Recursive: true,
}, result)
if err == nil {
t.Fatalf("Success to create GetList: %v", err)
}
select {
case <-watchClosed:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for watch to close")
}
}
func TestTimeBucketWatchersBasic(t *testing.T) {
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
return true
@ -642,7 +714,9 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
pred.AllowWatchBookmarks = true
@ -738,7 +812,9 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
pred.AllowWatchBookmarks = allowWatchBookmarks
@ -836,7 +912,9 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
cacher.bookmarkWatchers.bookmarkFrequency = time.Second
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
pred.AllowWatchBookmarks = true
@ -904,7 +982,9 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Ensure there is some budget for slowing down processing.
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
@ -980,7 +1060,9 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
makePod := func(i int) *examplev1.Pod {
return &examplev1.Pod{
@ -1056,7 +1138,9 @@ func TestStartingResourceVersion(t *testing.T) {
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Ensure there is some budget for slowing down processing.
// We use the fakeTimeBudget to prevent this test from flaking under
@ -1134,7 +1218,9 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Ensure there is some budget for slowing down processing.
// We use the fakeTimeBudget to prevent this test from flaking under
@ -1243,7 +1329,9 @@ func TestCachingDeleteEvents(t *testing.T) {
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
fooPredicate := storage.SelectionPredicate{
Label: labels.SelectorFromSet(map[string]string{"foo": "true"}),
@ -1323,7 +1411,9 @@ func testCachingObjects(t *testing.T, watchersCount int) {
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
dispatchedEvents := []*watchCacheEvent{}
cacher.watchCache.eventHandler = func(event *watchCacheEvent) {
@ -1417,7 +1507,9 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Ensure there is enough budget for slow processing since
// the entire watch cache is going to be served through the
// interval and events won't be popped from the cacheWatcher's

View File

@ -0,0 +1,96 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"fmt"
"sync"
)
type status int
const (
Pending status = iota
Ready
Stopped
)
// ready is a three state condition variable that blocks until is Ready if is not Stopped.
// Its initial state is Pending.
type ready struct {
state status
c *sync.Cond
}
func newReady() *ready {
return &ready{
c: sync.NewCond(&sync.RWMutex{}),
state: Pending,
}
}
// wait blocks until it is Ready or Stopped, it returns an error if is Stopped.
func (r *ready) wait() error {
r.c.L.Lock()
defer r.c.L.Unlock()
for r.state == Pending {
r.c.Wait()
}
switch r.state {
case Ready:
return nil
case Stopped:
return fmt.Errorf("apiserver cacher is stopped")
default:
return fmt.Errorf("unexpected apiserver cache state: %v", r.state)
}
}
// check returns true only if it is Ready.
func (r *ready) check() bool {
// TODO: Make check() function more sophisticated, in particular
// allow it to behave as "waitWithTimeout".
rwMutex := r.c.L.(*sync.RWMutex)
rwMutex.RLock()
defer rwMutex.RUnlock()
return r.state == Ready
}
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
func (r *ready) set(ok bool) {
r.c.L.Lock()
defer r.c.L.Unlock()
if r.state == Stopped {
return
}
if ok {
r.state = Ready
} else {
r.state = Pending
}
r.c.Broadcast()
}
// stop the condition variable and set it as Stopped. This state is irreversible.
func (r *ready) stop() {
r.c.L.Lock()
defer r.c.L.Unlock()
if r.state != Stopped {
r.state = Stopped
r.c.Broadcast()
}
}

View File

@ -0,0 +1,83 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"testing"
)
func Test_newReady(t *testing.T) {
errCh := make(chan error, 10)
ready := newReady()
ready.set(false)
// create 10 goroutines waiting for ready
for i := 0; i < 10; i++ {
go func() {
errCh <- ready.wait()
}()
}
ready.set(true)
for i := 0; i < 10; i++ {
if err := <-errCh; err != nil {
t.Errorf("unexpected error on channel %d", i)
}
}
}
func Test_newReadyStop(t *testing.T) {
errCh := make(chan error, 10)
ready := newReady()
ready.set(false)
// create 10 goroutines waiting for ready and stop
for i := 0; i < 10; i++ {
go func() {
errCh <- ready.wait()
}()
}
ready.stop()
for i := 0; i < 10; i++ {
if err := <-errCh; err == nil {
t.Errorf("unexpected success on channel %d", i)
}
}
}
func Test_newReadyCheck(t *testing.T) {
ready := newReady()
// it starts as false
if ready.check() {
t.Errorf("unexpected ready state %v", ready.check())
}
ready.set(true)
if !ready.check() {
t.Errorf("unexpected ready state %v", ready.check())
}
// stop sets ready to false
ready.stop()
if ready.check() {
t.Errorf("unexpected ready state %v", ready.check())
}
// can not set to true if is stopped
ready.set(true)
if ready.check() {
t.Errorf("unexpected ready state %v", ready.check())
}
err := ready.wait()
if err == nil {
t.Errorf("expected error waiting on a stopped state")
}
}