storage/etcd: the watcher supports the API streaming
Kubernetes-commit: ca562fd280a9dd5db952eb8c14b93fcf6668cc49
This commit is contained in:
parent
10be293530
commit
90ba08909c
|
@ -30,6 +30,17 @@ type event struct {
|
|||
isDeleted bool
|
||||
isCreated bool
|
||||
isProgressNotify bool
|
||||
// isInitialEventsEndBookmark helps us keep track
|
||||
// of whether we have sent an annotated bookmark event.
|
||||
//
|
||||
// when this variable is set to true,
|
||||
// a special annotation will be added
|
||||
// to the bookmark event.
|
||||
//
|
||||
// note that we decided to extend the event
|
||||
// struct field to eliminate contention
|
||||
// between startWatching and processEvent
|
||||
isInitialEventsEndBookmark bool
|
||||
}
|
||||
|
||||
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
|
||||
|
|
|
@ -36,7 +36,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/conversion"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/audit"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
|
@ -112,12 +111,11 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
|
|||
pathPrefix += "/"
|
||||
}
|
||||
|
||||
// TODO(p0lyn0mial): pass newListFunc and resourcePrefix to the watcher
|
||||
w := &watcher{
|
||||
client: c,
|
||||
codec: codec,
|
||||
groupResource: groupResource,
|
||||
newFunc: newFunc,
|
||||
groupResource: groupResource,
|
||||
versioner: versioner,
|
||||
transformer: transformer,
|
||||
}
|
||||
|
@ -126,7 +124,6 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
|
|||
} else {
|
||||
w.objectType = reflect.TypeOf(newFunc()).String()
|
||||
}
|
||||
|
||||
s := &store{
|
||||
client: c,
|
||||
codec: codec,
|
||||
|
@ -139,6 +136,10 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
|
|||
watcher: w,
|
||||
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
|
||||
}
|
||||
|
||||
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
|
||||
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -855,18 +856,7 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
|
|||
}
|
||||
|
||||
// Watch implements storage.Interface.Watch.
|
||||
// TODO(#115478): In order to graduate the WatchList feature to beta, the etcd3 implementation must/should also support it.
|
||||
func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
||||
// it is safe to skip SendInitialEvents if the request is backward compatible
|
||||
// see https://github.com/kubernetes/kubernetes/blob/267eb25e60955fe8e438c6311412e7cf7d028acb/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go#L260
|
||||
compatibility := opts.Predicate.AllowWatchBookmarks == false && (opts.ResourceVersion == "" || opts.ResourceVersion == "0")
|
||||
if opts.SendInitialEvents != nil && !compatibility {
|
||||
return nil, apierrors.NewInvalid(
|
||||
schema.GroupKind{Group: s.groupResource.Group, Kind: s.groupResource.Resource},
|
||||
"",
|
||||
field.ErrorList{field.Forbidden(field.NewPath("sendInitialEvents"), "for watch is unsupported by an etcd cluster")},
|
||||
)
|
||||
}
|
||||
preparedKey, err := s.prepareKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -26,20 +26,21 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
grpccodes "google.golang.org/grpc/codes"
|
||||
grpcstatus "google.golang.org/grpc/status"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
|
@ -67,13 +68,14 @@ func TestOnlySetFatalOnDecodeError(b bool) {
|
|||
}
|
||||
|
||||
type watcher struct {
|
||||
client *clientv3.Client
|
||||
codec runtime.Codec
|
||||
newFunc func() runtime.Object
|
||||
objectType string
|
||||
groupResource schema.GroupResource
|
||||
versioner storage.Versioner
|
||||
transformer value.Transformer
|
||||
client *clientv3.Client
|
||||
codec runtime.Codec
|
||||
newFunc func() runtime.Object
|
||||
objectType string
|
||||
groupResource schema.GroupResource
|
||||
versioner storage.Versioner
|
||||
transformer value.Transformer
|
||||
getCurrentStorageRV func(context.Context) (uint64, error)
|
||||
}
|
||||
|
||||
// watchChan implements watch.Interface.
|
||||
|
@ -105,8 +107,12 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage
|
|||
if opts.ProgressNotify && w.newFunc == nil {
|
||||
return nil, apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided"))
|
||||
}
|
||||
wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, opts.Predicate)
|
||||
go wc.run()
|
||||
startWatchRV, err := w.getStartWatchResourceVersion(ctx, rev, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wc := w.createWatchChan(ctx, key, startWatchRV, opts.Recursive, opts.ProgressNotify, opts.Predicate)
|
||||
go wc.run(isInitialEventsEndBookmarkRequired(opts), areInitialEventsRequired(rev, opts))
|
||||
|
||||
// For etcd watch we don't have an easy way to answer whether the watch
|
||||
// has already caught up. So in the initial version (given that watchcache
|
||||
|
@ -138,6 +144,62 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
|
|||
return wc
|
||||
}
|
||||
|
||||
// getStartWatchResourceVersion returns a ResourceVersion
|
||||
// the watch will be started from.
|
||||
// Depending on the input parameters the semantics of the returned ResourceVersion are:
|
||||
// - start at Exact (return resourceVersion)
|
||||
// - start at Most Recent (return an RV from etcd)
|
||||
func (w *watcher) getStartWatchResourceVersion(ctx context.Context, resourceVersion int64, opts storage.ListOptions) (int64, error) {
|
||||
if resourceVersion > 0 {
|
||||
return resourceVersion, nil
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
|
||||
return 0, nil
|
||||
}
|
||||
if opts.SendInitialEvents == nil || *opts.SendInitialEvents {
|
||||
// note that when opts.SendInitialEvents=true
|
||||
// we will be issuing a consistent LIST request
|
||||
// against etcd followed by the special bookmark event
|
||||
return 0, nil
|
||||
}
|
||||
// at this point the clients is interested
|
||||
// only in getting a stream of events
|
||||
// starting at the MostRecent point in time (RV)
|
||||
currentStorageRV, err := w.getCurrentStorageRV(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// currentStorageRV is taken from resp.Header.Revision (int64)
|
||||
// and cast to uint64, so it is safe to do reverse
|
||||
// at some point we should unify the interface but that
|
||||
// would require changing Versioner.UpdateList
|
||||
return int64(currentStorageRV), nil
|
||||
}
|
||||
|
||||
// isInitialEventsEndBookmarkRequired since there is no way to directly set
|
||||
// opts.ProgressNotify from the API and the etcd3 impl doesn't support
|
||||
// notification for external clients we simply return initialEventsEndBookmarkRequired
|
||||
// to only send the bookmark event after the initial list call.
|
||||
//
|
||||
// see: https://github.com/kubernetes/kubernetes/issues/120348
|
||||
func isInitialEventsEndBookmarkRequired(opts storage.ListOptions) bool {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
|
||||
return false
|
||||
}
|
||||
return opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks
|
||||
}
|
||||
|
||||
// areInitialEventsRequired returns true if all events from the etcd should be returned.
|
||||
func areInitialEventsRequired(resourceVersion int64, opts storage.ListOptions) bool {
|
||||
if opts.SendInitialEvents == nil && resourceVersion == 0 {
|
||||
return true // legacy case
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
|
||||
return false
|
||||
}
|
||||
return opts.SendInitialEvents != nil && *opts.SendInitialEvents
|
||||
}
|
||||
|
||||
type etcdError interface {
|
||||
Code() grpccodes.Code
|
||||
Error() string
|
||||
|
@ -163,9 +225,9 @@ func isCancelError(err error) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (wc *watchChan) run() {
|
||||
func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bool) {
|
||||
watchClosedCh := make(chan struct{})
|
||||
go wc.startWatching(watchClosedCh)
|
||||
go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
|
||||
|
||||
var resultChanWG sync.WaitGroup
|
||||
resultChanWG.Add(1)
|
||||
|
@ -284,14 +346,44 @@ func logWatchChannelErr(err error) {
|
|||
// startWatching does:
|
||||
// - get current objects if initialRev=0; set initialRev to current rev
|
||||
// - watch on given key and send events to process.
|
||||
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
|
||||
if wc.initialRev == 0 {
|
||||
//
|
||||
// initialEventsEndBookmarkSent helps us keep track
|
||||
// of whether we have sent an annotated bookmark event.
|
||||
//
|
||||
// it's important to note that we don't
|
||||
// need to track the actual RV because
|
||||
// we only send the bookmark event
|
||||
// after the initial list call.
|
||||
//
|
||||
// when this variable is set to false,
|
||||
// it means we don't have any specific
|
||||
// preferences for delivering bookmark events.
|
||||
func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEndBookmarkRequired, forceInitialEvents bool) {
|
||||
if wc.initialRev > 0 && forceInitialEvents {
|
||||
currentStorageRV, err := wc.watcher.getCurrentStorageRV(wc.ctx)
|
||||
if err != nil {
|
||||
wc.sendError(err)
|
||||
return
|
||||
}
|
||||
if uint64(wc.initialRev) > currentStorageRV {
|
||||
wc.sendError(storage.NewTooLargeResourceVersionError(uint64(wc.initialRev), currentStorageRV, int(wait.Jitter(1*time.Second, 3).Seconds())))
|
||||
return
|
||||
}
|
||||
}
|
||||
if forceInitialEvents {
|
||||
if err := wc.sync(); err != nil {
|
||||
klog.Errorf("failed to sync with latest state: %v", err)
|
||||
wc.sendError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if initialEventsEndBookmarkRequired {
|
||||
wc.sendEvent(func() *event {
|
||||
e := progressNotifyEvent(wc.initialRev)
|
||||
e.isInitialEventsEndBookmark = true
|
||||
return e
|
||||
}())
|
||||
}
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
|
||||
if wc.recursive {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
|
@ -388,6 +480,12 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
|||
klog.Errorf("failed to propagate object version: %v", err)
|
||||
return nil
|
||||
}
|
||||
if e.isInitialEventsEndBookmark {
|
||||
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
|
||||
wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
res = &watch.Event{
|
||||
Type: watch.Bookmark,
|
||||
Object: object,
|
||||
|
|
|
@ -24,9 +24,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
|
@ -35,6 +39,7 @@ import (
|
|||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
|
@ -123,6 +128,16 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
|
|||
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
|
||||
}
|
||||
|
||||
func TestEtcdWatchSemantics(t *testing.T) {
|
||||
ctx, store, _ := testSetup(t)
|
||||
storagetesting.RunWatchSemantics(ctx, t, store)
|
||||
}
|
||||
|
||||
func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
|
||||
ctx, store, _ := testSetup(t)
|
||||
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
|
||||
}
|
||||
|
||||
// =======================================================================
|
||||
// Implementation-specific tests are following.
|
||||
// The following tests are exercising the details of the implementation
|
||||
|
@ -145,7 +160,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
|||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
w.run()
|
||||
w.run(false, true)
|
||||
wg.Done()
|
||||
}()
|
||||
w.errChan <- fmt.Errorf("some error")
|
||||
|
@ -194,6 +209,51 @@ func TestWatchErrorIncorrectConfiguration(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestTooLargeResourceVersionErrorForWatchList(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
|
||||
origCtx, store, _ := testSetup(t)
|
||||
ctx, cancel := context.WithCancel(origCtx)
|
||||
defer cancel()
|
||||
requestOpts := storage.ListOptions{
|
||||
SendInitialEvents: ptr.To(true),
|
||||
Recursive: true,
|
||||
Predicate: storage.SelectionPredicate{
|
||||
Field: fields.Everything(),
|
||||
Label: labels.Everything(),
|
||||
AllowWatchBookmarks: true,
|
||||
},
|
||||
}
|
||||
var expectedErr *apierrors.StatusError
|
||||
if !errors.As(storage.NewTooLargeResourceVersionError(uint64(102), 1, 0), &expectedErr) {
|
||||
t.Fatalf("Unable to convert NewTooLargeResourceVersionError to apierrors.StatusError")
|
||||
}
|
||||
|
||||
w, err := store.watcher.Watch(ctx, "/abc", int64(102), requestOpts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w.Stop()
|
||||
|
||||
actualEvent := <-w.ResultChan()
|
||||
if actualEvent.Type != watch.Error {
|
||||
t.Fatalf("Unexpected type of the event: %v, expected: %v", actualEvent.Type, watch.Error)
|
||||
}
|
||||
actualErr, ok := actualEvent.Object.(*metav1.Status)
|
||||
if !ok {
|
||||
t.Fatalf("Expected *apierrors.StatusError, got: %#v", actualEvent.Object)
|
||||
}
|
||||
|
||||
if actualErr.Details.RetryAfterSeconds <= 0 {
|
||||
t.Fatalf("RetryAfterSeconds must be > 0, actual value: %v", actualErr.Details.RetryAfterSeconds)
|
||||
}
|
||||
// rewrite the Details as it contains retry seconds
|
||||
// and validate the whole struct
|
||||
expectedErr.ErrStatus.Details = actualErr.Details
|
||||
if diff := cmp.Diff(*actualErr, expectedErr.ErrStatus); diff != "" {
|
||||
t.Fatalf("Unexpected error returned, diff: %v", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchChanSync(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
|
|
Loading…
Reference in New Issue