Merge pull request #117137 from p0lyn0mial/upstream-streaming-api-deadlock
cacher: prevent a potential deadlock Kubernetes-commit: afcc207feb0da6c5866dea33b07856091328f9ab
This commit is contained in:
commit
cff2363f5e
4
go.mod
4
go.mod
|
|
@ -42,7 +42,7 @@ require (
|
||||||
google.golang.org/protobuf v1.28.1
|
google.golang.org/protobuf v1.28.1
|
||||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||||
gopkg.in/square/go-jose.v2 v2.6.0
|
gopkg.in/square/go-jose.v2 v2.6.0
|
||||||
k8s.io/api v0.0.0-20230412035552-2bdacdf27ceb
|
k8s.io/api v0.0.0-20230412115724-b0b8e3f95d08
|
||||||
k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7
|
k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7
|
||||||
k8s.io/client-go v0.0.0-20230412040258-8005e0d28ba2
|
k8s.io/client-go v0.0.0-20230412040258-8005e0d28ba2
|
||||||
k8s.io/component-base v0.0.0-20230327183930-4fb97becef37
|
k8s.io/component-base v0.0.0-20230327183930-4fb97becef37
|
||||||
|
|
@ -124,7 +124,7 @@ require (
|
||||||
)
|
)
|
||||||
|
|
||||||
replace (
|
replace (
|
||||||
k8s.io/api => k8s.io/api v0.0.0-20230412035552-2bdacdf27ceb
|
k8s.io/api => k8s.io/api v0.0.0-20230412115724-b0b8e3f95d08
|
||||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7
|
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7
|
||||||
k8s.io/client-go => k8s.io/client-go v0.0.0-20230412040258-8005e0d28ba2
|
k8s.io/client-go => k8s.io/client-go v0.0.0-20230412040258-8005e0d28ba2
|
||||||
k8s.io/component-base => k8s.io/component-base v0.0.0-20230327183930-4fb97becef37
|
k8s.io/component-base => k8s.io/component-base v0.0.0-20230327183930-4fb97becef37
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -878,8 +878,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
|
||||||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||||
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||||
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||||
k8s.io/api v0.0.0-20230412035552-2bdacdf27ceb h1:cfveC5neDeBviOYWd4lir9rh9A0es/5mJvZ5FWjyhOk=
|
k8s.io/api v0.0.0-20230412115724-b0b8e3f95d08 h1:x1R20/eaphFlQSm4SJC2evVwufVUt5yeOmQJgNjyn+Y=
|
||||||
k8s.io/api v0.0.0-20230412035552-2bdacdf27ceb/go.mod h1:vtqOH+6A0ypk7hxqunFCvaEIub+VUspnbLv5dNIfIuE=
|
k8s.io/api v0.0.0-20230412115724-b0b8e3f95d08/go.mod h1:cQ6M2OwzuocrtP+XOKc0I/oqo1E+CNAVuEVkvwXqG3I=
|
||||||
k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7 h1:a4P0At3Mtt9Rqu98XWQ1nG/hlYDhYQiCxYWDEXTz/8c=
|
k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7 h1:a4P0At3Mtt9Rqu98XWQ1nG/hlYDhYQiCxYWDEXTz/8c=
|
||||||
k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7/go.mod h1:5ikh59fK3AJ287GUvpUsryoMFtH9zj/ARfWCo3AyXTM=
|
k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7/go.mod h1:5ikh59fK3AJ287GUvpUsryoMFtH9zj/ARfWCo3AyXTM=
|
||||||
k8s.io/client-go v0.0.0-20230412040258-8005e0d28ba2 h1:Mxfg52BtCwS9aQ0uqJSBtpQrDvFdbTZve06ttHHOqRk=
|
k8s.io/client-go v0.0.0-20230412040258-8005e0d28ba2 h1:Mxfg52BtCwS9aQ0uqJSBtpQrDvFdbTZve06ttHHOqRk=
|
||||||
|
|
|
||||||
|
|
@ -592,6 +592,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
||||||
identifier,
|
identifier,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// note that c.waitUntilWatchCacheFreshAndForceAllEvents must be called without
|
||||||
|
// the c.watchCache.RLock held otherwise we are at risk of a deadlock
|
||||||
|
// mainly because c.watchCache.processEvent method won't be able to make progress
|
||||||
|
//
|
||||||
|
// moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock
|
||||||
|
// it is safe to release the lock after the method finishes because we don't require
|
||||||
|
// any atomicity between the call to the method and further calls that actually get the events.
|
||||||
|
forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
|
||||||
|
if err != nil {
|
||||||
|
return newErrWatcher(err), nil
|
||||||
|
}
|
||||||
|
|
||||||
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
||||||
// no new events will be processed in the meantime. The watchCache will be unlocked
|
// no new events will be processed in the meantime. The watchCache will be unlocked
|
||||||
// on return from this function.
|
// on return from this function.
|
||||||
|
|
@ -599,10 +611,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
||||||
// underlying watchCache is calling processEvent under its lock.
|
// underlying watchCache is calling processEvent under its lock.
|
||||||
c.watchCache.RLock()
|
c.watchCache.RLock()
|
||||||
defer c.watchCache.RUnlock()
|
defer c.watchCache.RUnlock()
|
||||||
forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
|
|
||||||
if err != nil {
|
|
||||||
return newErrWatcher(err), nil
|
|
||||||
}
|
|
||||||
startWatchRV := startWatchResourceVersionFn()
|
startWatchRV := startWatchResourceVersionFn()
|
||||||
var cacheInterval *watchCacheInterval
|
var cacheInterval *watchCacheInterval
|
||||||
if forceAllEvents {
|
if forceAllEvents {
|
||||||
|
|
|
||||||
|
|
@ -1823,6 +1823,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
|
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -1830,17 +1831,41 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)})
|
opts := storage.ListOptions{
|
||||||
require.NotNil(t, err, "the target method should return non nil error")
|
Predicate: storage.Everything,
|
||||||
require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100")
|
SendInitialEvents: pointer.Bool(true),
|
||||||
require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)")
|
ResourceVersion: "105",
|
||||||
|
}
|
||||||
|
opts.Predicate.AllowWatchBookmarks = true
|
||||||
|
|
||||||
|
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
|
||||||
|
require.NoError(t, err, "failed to create watch: %v")
|
||||||
|
defer w.Stop()
|
||||||
|
verifyEvents(t, w, []watch.Event{
|
||||||
|
{
|
||||||
|
Type: watch.Error,
|
||||||
|
Object: &metav1.Status{
|
||||||
|
Status: metav1.StatusFailure,
|
||||||
|
Message: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).Error(),
|
||||||
|
Details: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).(*apierrors.StatusError).Status().Details,
|
||||||
|
Reason: metav1.StatusReasonTimeout,
|
||||||
|
Code: 504,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, true)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
|
cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
|
||||||
}()
|
}()
|
||||||
forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)})
|
w, err = cacher.Watch(context.Background(), "pods/ns", opts)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err, "failed to create watch: %v")
|
||||||
require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)")
|
defer w.Stop()
|
||||||
|
verifyEvents(t, w, []watch.Event{
|
||||||
|
{
|
||||||
|
Type: watch.Added,
|
||||||
|
Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}),
|
||||||
|
},
|
||||||
|
}, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeStorage struct {
|
type fakeStorage struct {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue