xds: implement ADS stream flow control mechanism (#7458)

This commit is contained in:
Easwar Swaminathan 2024-08-12 07:32:53 -07:00 committed by GitHub
parent 54b48f7e46
commit ced812e328
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 1107 additions and 215 deletions

View File

@ -42,7 +42,6 @@ func CreateBootstrapFileForTesting(t *testing.T, bootstrapContents []byte) {
if err := os.WriteFile(f.Name(), bootstrapContents, 0644); err != nil {
t.Fatalf("Failed to created bootstrap file: %v", err)
}
t.Logf("Created bootstrap file at %q with contents: %s\n", f.Name(), bootstrapContents)
origBootstrapFileName := envconfig.XDSBootstrapFileName
envconfig.XDSBootstrapFileName = f.Name()

View File

@ -600,10 +600,6 @@ func newConfigFromContents(data []byte) (*Config, error) {
if err := config.UnmarshalJSON(data); err != nil {
return nil, err
}
if logger.V(2) {
logger.Infof("Bootstrap config for creating xds-client: %s", config)
}
return config, nil
}

View File

@ -71,27 +71,51 @@ func Test(t *testing.T) {
type unimplementedListenerWatcher struct{}
func (unimplementedListenerWatcher) OnUpdate(*xdsresource.ListenerResourceData) {}
func (unimplementedListenerWatcher) OnError(error) {}
func (unimplementedListenerWatcher) OnResourceDoesNotExist() {}
func (unimplementedListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
func (unimplementedListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
func (unimplementedListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
type unimplementedRouteConfigWatcher struct{}
func (unimplementedRouteConfigWatcher) OnUpdate(*xdsresource.RouteConfigResourceData) {}
func (unimplementedRouteConfigWatcher) OnError(error) {}
func (unimplementedRouteConfigWatcher) OnResourceDoesNotExist() {}
func (unimplementedRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
func (unimplementedRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
func (unimplementedRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
type unimplementedClusterWatcher struct{}
func (unimplementedClusterWatcher) OnUpdate(*xdsresource.ClusterResourceData) {}
func (unimplementedClusterWatcher) OnError(error) {}
func (unimplementedClusterWatcher) OnResourceDoesNotExist() {}
func (unimplementedClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
func (unimplementedClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
func (unimplementedClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
type unimplementedEndpointsWatcher struct{}
func (unimplementedEndpointsWatcher) OnUpdate(*xdsresource.EndpointsResourceData) {}
func (unimplementedEndpointsWatcher) OnError(error) {}
func (unimplementedEndpointsWatcher) OnResourceDoesNotExist() {}
func (unimplementedEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
func (unimplementedEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
func (unimplementedEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
}
// Creates a gRPC server and starts serving a CSDS service implementation on it.
// Returns the address of the newly created gRPC server.

View File

@ -32,22 +32,19 @@ type clusterWatcher struct {
parent *cdsBalancer
}
func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData) {
cw.parent.serializer.TrySchedule(func(context.Context) {
cw.parent.onClusterUpdate(cw.name, u.Resource)
})
func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) {
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone.OnDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone)
}
func (cw *clusterWatcher) OnError(err error) {
cw.parent.serializer.TrySchedule(func(context.Context) {
cw.parent.onClusterError(cw.name, err)
})
func (cw *clusterWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone.OnDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone.OnDone)
}
func (cw *clusterWatcher) OnResourceDoesNotExist() {
cw.parent.serializer.TrySchedule(func(context.Context) {
cw.parent.onClusterResourceNotFound(cw.name)
})
func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone.OnDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone)
}
// watcherState groups the state associated with a clusterWatcher.

View File

@ -207,11 +207,6 @@ func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) {
// handleResourceUpdate handles a resource update or error from the resource
// resolver by propagating the same to the child LB policy.
func (b *clusterResolverBalancer) handleResourceUpdate(update *resourceUpdate) {
if err := update.err; err != nil {
b.handleErrorFromUpdate(err, false)
return
}
b.watchUpdateReceived = true
b.priorities = update.priorities
@ -219,6 +214,10 @@ func (b *clusterResolverBalancer) handleResourceUpdate(update *resourceUpdate) {
// for all configured discovery mechanisms ordered by priority. This is used
// to generate configuration for the priority LB policy.
b.updateChildConfig()
if update.onDone != nil {
update.onDone.OnDone()
}
}
// updateChildConfig builds child policy configuration using endpoint addresses

View File

@ -30,8 +30,14 @@ import (
// resourceUpdate is a combined update from all the resources, in the order of
// priority. For example, it can be {EDS, EDS, DNS}.
type resourceUpdate struct {
// A discovery mechanism would return an empty update when it runs into
// errors, and this would result in the priority LB policy reporting
// TRANSIENT_FAILURE (if there was a single discovery mechanism), or would
// fallback to the next highest priority that is available.
priorities []priorityConfig
err error
// To be invoked once the update is completely processed, or is dropped in
// favor of a newer update.
onDone xdsresource.DoneNotifier
}
// topLevelResolver is used by concrete endpointsResolver implementations for
@ -39,7 +45,11 @@ type resourceUpdate struct {
// interface and takes appropriate actions upon receipt of updates and errors
// from underlying concrete resolvers.
type topLevelResolver interface {
onUpdate()
// onUpdate is called when a new update is received from the underlying
// endpointsResolver implementation. The onDone callback is to be invoked
// once the update is completely processed, or is dropped in favor of a
// newer update.
onUpdate(onDone xdsresource.DoneNotifier)
}
// endpointsResolver wraps the functionality to resolve a given resource name to
@ -205,7 +215,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
}
// Regenerate even if there's no change in discovery mechanism, in case
// priority order changed.
rr.generateLocked()
rr.generateLocked(xdsresource.NopDoneNotifier{})
}
// resolveNow is typically called to trigger re-resolve of DNS. The EDS
@ -252,7 +262,10 @@ func (rr *resourceResolver) stop(closing bool) {
// after they are stopped. Therefore, we don't have to worry about another
// write to this channel happening at the same time as this one.
select {
case <-rr.updateChannel:
case ru := <-rr.updateChannel:
if ru.onDone != nil {
ru.onDone.OnDone()
}
default:
}
rr.updateChannel <- &resourceUpdate{}
@ -262,14 +275,20 @@ func (rr *resourceResolver) stop(closing bool) {
// result on the update channel if all child resolvers have received at least
// one update. Otherwise it returns early.
//
// caller must hold rr.mu.
func (rr *resourceResolver) generateLocked() {
// The onDone callback is invoked inline if not all child resolvers have
// received at least one update. If all child resolvers have received at least
// one update, onDone is invoked when the combined update is processed by the
// clusterresolver LB policy.
//
// Caller must hold rr.mu.
func (rr *resourceResolver) generateLocked(onDone xdsresource.DoneNotifier) {
var ret []priorityConfig
for _, rDM := range rr.children {
u, ok := rDM.r.lastUpdate()
if !ok {
// Don't send updates to parent until all resolvers have update to
// send.
onDone.OnDone()
return
}
switch uu := u.(type) {
@ -280,16 +299,23 @@ func (rr *resourceResolver) generateLocked() {
}
}
select {
case <-rr.updateChannel:
// A previously unprocessed update is dropped in favor of the new one, and
// the former's onDone callback is invoked to unblock the xDS client's
// receive path.
case ru := <-rr.updateChannel:
if ru.onDone != nil {
ru.onDone.OnDone()
}
default:
}
rr.updateChannel <- &resourceUpdate{priorities: ret}
rr.updateChannel <- &resourceUpdate{priorities: ret, onDone: onDone}
}
func (rr *resourceResolver) onUpdate() {
rr.serializer.TrySchedule(func(context.Context) {
func (rr *resourceResolver) onUpdate(onDone xdsresource.DoneNotifier) {
handleUpdate := func(context.Context) {
rr.mu.Lock()
rr.generateLocked()
rr.generateLocked(onDone)
rr.mu.Unlock()
})
}
rr.serializer.ScheduleOr(handleUpdate, func() { onDone.OnDone() })
}

View File

@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
var (
@ -79,7 +80,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr
ret.logger.Infof("Failed to parse dns hostname %q in clusterresolver LB policy", target)
}
ret.updateReceived = true
ret.topLevelResolver.onUpdate()
ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
return ret
}
@ -89,7 +90,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr
ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err)
}
ret.updateReceived = true
ret.topLevelResolver.onUpdate()
ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
return ret
}
ret.dnsR = r
@ -153,7 +154,7 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error {
dr.updateReceived = true
dr.mu.Unlock()
dr.topLevelResolver.onUpdate()
dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
return nil
}
@ -176,7 +177,7 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) {
dr.updateReceived = true
dr.mu.Unlock()
dr.topLevelResolver.onUpdate()
dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
}
func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) {

View File

@ -76,8 +76,9 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
}
// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData) {
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) {
if er.stopped.HasFired() {
onDone.OnDone()
return
}
@ -85,11 +86,12 @@ func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceD
er.update = &update.Resource
er.mu.Unlock()
er.topLevelResolver.onUpdate()
er.topLevelResolver.onUpdate(onDone)
}
func (er *edsDiscoveryMechanism) OnError(err error) {
func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotifier) {
if er.stopped.HasFired() {
onDone.OnDone()
return
}
@ -102,6 +104,7 @@ func (er *edsDiscoveryMechanism) OnError(err error) {
// Continue using a previously received good configuration if one
// exists.
er.mu.Unlock()
onDone.OnDone()
return
}
@ -114,11 +117,12 @@ func (er *edsDiscoveryMechanism) OnError(err error) {
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()
er.topLevelResolver.onUpdate()
er.topLevelResolver.onUpdate(onDone)
}
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() {
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
if er.stopped.HasFired() {
onDone.OnDone()
return
}
@ -136,5 +140,5 @@ func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() {
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()
er.topLevelResolver.onUpdate()
er.topLevelResolver.onUpdate(onDone)
}

View File

@ -36,22 +36,19 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
return lw
}
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
l.parent.serializer.TrySchedule(func(context.Context) {
l.parent.onListenerResourceUpdate(update.Resource)
})
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) {
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone.OnDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone)
}
func (l *listenerWatcher) OnError(err error) {
l.parent.serializer.TrySchedule(func(context.Context) {
l.parent.onListenerResourceError(err)
})
func (l *listenerWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone.OnDone() }
l.parent.serializer.ScheduleOr(handleError, onDone.OnDone)
}
func (l *listenerWatcher) OnResourceDoesNotExist() {
l.parent.serializer.TrySchedule(func(context.Context) {
l.parent.onListenerResourceNotFound()
})
func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone.OnDone() }
l.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone)
}
func (l *listenerWatcher) stop() {
@ -71,22 +68,22 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
return rw
}
func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
r.parent.serializer.TrySchedule(func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource)
})
func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) {
handleUpdate := func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
onDone.OnDone()
}
r.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone)
}
func (r *routeConfigWatcher) OnError(err error) {
r.parent.serializer.TrySchedule(func(context.Context) {
r.parent.onRouteConfigResourceError(r.resourceName, err)
})
func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone.OnDone() }
r.parent.serializer.ScheduleOr(handleError, onDone.OnDone)
}
func (r *routeConfigWatcher) OnResourceDoesNotExist() {
r.parent.serializer.TrySchedule(func(context.Context) {
r.parent.onRouteConfigResourceNotFound(r.resourceName)
})
func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone.OnDone() }
r.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone)
}
func (r *routeConfigWatcher) stop() {

View File

@ -410,7 +410,8 @@ type ldsWatcher struct {
name string
}
func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) {
defer onDone.OnDone()
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
return
@ -421,7 +422,8 @@ func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
lw.parent.handleLDSUpdate(update.Resource)
}
func (lw *ldsWatcher) OnError(err error) {
func (lw *ldsWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
defer onDone.OnDone()
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received error: %v after listener was closed", lw.name, err)
return
@ -433,7 +435,8 @@ func (lw *ldsWatcher) OnError(err error) {
// continue to use the old configuration.
}
func (lw *ldsWatcher) OnResourceDoesNotExist() {
func (lw *ldsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
defer onDone.OnDone()
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received resource-does-not-exist error after listener was closed", lw.name)
return

View File

@ -147,7 +147,8 @@ type rdsWatcher struct {
canceled bool // eats callbacks if true
}
func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) {
defer onDone.OnDone()
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()
@ -160,7 +161,8 @@ func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource})
}
func (rw *rdsWatcher) OnError(err error) {
func (rw *rdsWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
defer onDone.OnDone()
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()
@ -173,7 +175,8 @@ func (rw *rdsWatcher) OnError(err error) {
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
}
func (rw *rdsWatcher) OnResourceDoesNotExist() {
func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
defer onDone.OnDone()
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()

View File

@ -37,7 +37,8 @@ type TestResourceWatcher struct {
// OnUpdate is invoked by the xDS client to report the latest update on the resource
// being watched.
func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData) {
func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData, onDone xdsresource.DoneNotifier) {
defer onDone.OnDone()
select {
case <-w.UpdateCh:
default:
@ -46,7 +47,8 @@ func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData) {
}
// OnError is invoked by the xDS client to report the latest error.
func (w *TestResourceWatcher) OnError(err error) {
func (w *TestResourceWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
defer onDone.OnDone()
select {
case <-w.ErrorCh:
default:
@ -56,7 +58,8 @@ func (w *TestResourceWatcher) OnError(err error) {
// OnResourceDoesNotExist is used by the xDS client to report that the resource
// being watched no longer exists.
func (w *TestResourceWatcher) OnResourceDoesNotExist() {
func (w *TestResourceWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
defer onDone.OnDone()
select {
case <-w.ResourceDoesNotExistCh:
default:

View File

@ -148,7 +148,7 @@ func (a *authority) transportOnSendHandler(u *transport.ResourceSendInfo) {
a.startWatchTimersLocked(rType, u.ResourceNames)
}
func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate) error {
func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate, fc *transport.ADSFlowControl) error {
rType := a.resourceTypeGetter(resourceUpdate.URL)
if rType == nil {
return xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource URL %v unknown in response from server", resourceUpdate.URL)
@ -159,14 +159,27 @@ func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate
ServerConfig: a.serverCfg,
}
updates, md, err := decodeAllResources(opts, rType, resourceUpdate)
a.updateResourceStateAndScheduleCallbacks(rType, updates, md)
a.updateResourceStateAndScheduleCallbacks(rType, updates, md, fc)
return err
}
func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Type, updates map[string]resourceDataErrTuple, md xdsresource.UpdateMetadata) {
func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Type, updates map[string]resourceDataErrTuple, md xdsresource.UpdateMetadata, fc *transport.ADSFlowControl) {
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()
// We build a list of callback funcs to invoke, and invoke them at the end
// of this method instead of inline (when handling the update for a
// particular resource), because we want to make sure that all calls to
// `fc.Add` happen before any callbacks are invoked. This will ensure that
// the next read is never attempted before all callbacks are invoked, and
// the watchers have processed the update.
funcsToSchedule := []func(context.Context){}
defer func() {
for _, f := range funcsToSchedule {
a.serializer.ScheduleOr(f, fc.OnDone)
}
}()
resourceStates := a.resources[rType]
for name, uErr := range updates {
if state, ok := resourceStates[name]; ok {
@ -210,7 +223,8 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty
for watcher := range state.watchers {
watcher := watcher
err := uErr.err
a.serializer.TrySchedule(func(context.Context) { watcher.OnError(err) })
fc.Add()
funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnError(err, fc) })
}
continue
}
@ -225,7 +239,8 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty
for watcher := range state.watchers {
watcher := watcher
resource := uErr.resource
a.serializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource) })
fc.Add()
funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnUpdate(resource, fc) })
}
}
// Sync cache.
@ -300,7 +315,8 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty
state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist}
for watcher := range state.watchers {
watcher := watcher
a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist() })
fc.Add()
funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceDoesNotExist(fc) })
}
}
}
@ -429,7 +445,7 @@ func (a *authority) newConnectionError(err error) {
for watcher := range state.watchers {
watcher := watcher
a.serializer.TrySchedule(func(context.Context) {
watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err))
watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), xdsresource.NopDoneNotifier{})
})
}
}
@ -495,7 +511,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
}
resource := state.cache
a.serializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource) })
a.serializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, xdsresource.NopDoneNotifier{}) })
}
return func() {
@ -548,7 +564,7 @@ func (a *authority) handleWatchTimerExpiryLocked(rType xdsresource.Type, resourc
state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist}
for watcher := range state.watchers {
watcher := watcher
a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist() })
a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(xdsresource.NopDoneNotifier{}) })
}
}
@ -574,7 +590,7 @@ func (a *authority) triggerResourceNotFoundForTesting(rType xdsresource.Type, re
state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist}
for watcher := range state.watchers {
watcher := watcher
a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist() })
a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(xdsresource.NopDoneNotifier{}) })
}
}

View File

@ -78,7 +78,7 @@ func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Du
if err != nil {
return nil, nil, err
}
c.logger.Infof("Created client with name %q to primary xDS management server: %q", name, config.XDSServers()[0])
c.logger.Infof("Created client with name %q and bootstrap configuration:\n %s", name, config)
client := &clientRefCounted{clientImpl: c, refCount: 1}
clients[name] = client
xdsClientImplCreateHook(name)

View File

@ -44,7 +44,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,
if err := c.resourceTypes.maybeRegister(rType); err != nil {
logger.Warningf("Watch registered for name %q of type %q which is already registered", rType.TypeName(), resourceName)
c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err) })
c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, xdsresource.NopDoneNotifier{}) })
return func() {}
}
@ -54,7 +54,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,
a, unref, err := c.findAuthority(n)
if err != nil {
logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName(), resourceName, n.Authority)
c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err) })
c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, xdsresource.NopDoneNotifier{}) })
return func() {}
}
cancelF := a.watchResource(rType, n.String(), watcher)

View File

@ -0,0 +1,25 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package internal contains functionality internal to the xdsclient package.
package internal
// The following vars can be overridden by tests.
var (
// NewADSStream is a function that returns a new ADS stream.
NewADSStream any // func(context.Context, *grpc.ClientConn) (v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error)
)

View File

@ -0,0 +1,623 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsclient_test
import (
"context"
"errors"
"slices"
"sort"
"testing"
"time"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal/xdsclient"
xdsclientinternal "google.golang.org/grpc/xds/internal/xdsclient/internal"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
// blockingListenerWatcher implements xdsresource.ListenerWatcher. It writes to
// a channel when it receives a callback from the watch. It also makes the
// DoneNotifier passed to the callback available to the test, thereby enabling
// the test to block this watcher for as long as required.
type blockingListenerWatcher struct {
doneNotifierCh chan xdsresource.DoneNotifier // DoneNotifier passed to the callback.
updateCh chan struct{} // Written to when an update is received.
errorCh chan struct{} // Written to when an error is received.
notFoundCh chan struct{} // Written to when the resource is not found.
}
func newBLockingListenerWatcher() *blockingListenerWatcher {
return &blockingListenerWatcher{
doneNotifierCh: make(chan xdsresource.DoneNotifier, 1),
updateCh: make(chan struct{}, 1),
errorCh: make(chan struct{}, 1),
notFoundCh: make(chan struct{}, 1),
}
}
func (lw *blockingListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, done xdsresource.DoneNotifier) {
// Notify receipt of the update.
select {
case lw.updateCh <- struct{}{}:
default:
}
select {
case lw.doneNotifierCh <- done:
default:
}
}
func (lw *blockingListenerWatcher) OnError(err error, done xdsresource.DoneNotifier) {
// Notify receipt of an error.
select {
case lw.errorCh <- struct{}{}:
default:
}
select {
case lw.doneNotifierCh <- done:
default:
}
}
func (lw *blockingListenerWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) {
// Notify receipt of resource not found.
select {
case lw.notFoundCh <- struct{}{}:
default:
}
select {
case lw.doneNotifierCh <- done:
default:
}
}
type wrappedADSStream struct {
v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
recvCh chan struct{}
doneCh <-chan struct{}
}
func newWrappedADSStream(stream v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, doneCh <-chan struct{}) *wrappedADSStream {
return &wrappedADSStream{
AggregatedDiscoveryService_StreamAggregatedResourcesClient: stream,
recvCh: make(chan struct{}, 1),
doneCh: doneCh,
}
}
func (w *wrappedADSStream) Recv() (*v3discoverypb.DiscoveryResponse, error) {
select {
case w.recvCh <- struct{}{}:
case <-w.doneCh:
return nil, errors.New("Recv() called after the test has finished")
}
return w.AggregatedDiscoveryService_StreamAggregatedResourcesClient.Recv()
}
// Overrides the function to create a new ADS stream (used by the xdsclient
// transport), and returns a wrapped ADS stream, where the test can monitor
// Recv() calls.
func overrideADSStreamCreation(t *testing.T) chan *wrappedADSStream {
t.Helper()
adsStreamCh := make(chan *wrappedADSStream, 1)
origNewADSStream := xdsclientinternal.NewADSStream
xdsclientinternal.NewADSStream = func(ctx context.Context, cc *grpc.ClientConn) (v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) {
s, err := v3adsgrpc.NewAggregatedDiscoveryServiceClient(cc).StreamAggregatedResources(ctx)
if err != nil {
return nil, err
}
ws := newWrappedADSStream(s, ctx.Done())
select {
case adsStreamCh <- ws:
default:
}
return ws, nil
}
t.Cleanup(func() { xdsclientinternal.NewADSStream = origNewADSStream })
return adsStreamCh
}
// Creates an xDS client with the given bootstrap contents.
func createXDSClient(t *testing.T, bootstrapContents []byte) xdsclient.XDSClient {
t.Helper()
client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{
Name: t.Name(),
Contents: bootstrapContents,
})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
t.Cleanup(close)
return client
}
// Tests ADS stream level flow control with a single resource. The test does the
// following:
// - Starts a management server and configures a listener resource on it.
// - Creates an xDS client to the above management server, starts a couple of
// listener watchers for the above resource, and verifies that the update
// reaches these watchers.
// - These watchers don't invoke the onDone callback until explicitly
// triggered by the test. This allows the test to verify that the next
// Recv() call on the ADS stream does not happen until both watchers have
// completely processed the update, i.e invoke the onDone callback.
// - Resource is updated on the management server, and the test verifies that
// the update reaches the watchers.
func (s) TestADSFlowControl_ResourceUpdates_SingleResource(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Override the ADS stream creation.
adsStreamCh := overrideADSStreamCreation(t)
// Start an xDS management server.
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
testutils.CreateBootstrapFileForTesting(t, bc)
// Create an xDS client with the above bootstrap contents.
client := createXDSClient(t, bc)
// Configure two watchers for the same listener resource.
const listenerResourceName = "test-listener-resource"
const routeConfigurationName = "test-route-configuration-resource"
watcher1 := newBLockingListenerWatcher()
cancel1 := xdsresource.WatchListener(client, listenerResourceName, watcher1)
defer cancel1()
watcher2 := newBLockingListenerWatcher()
cancel2 := xdsresource.WatchListener(client, listenerResourceName, watcher2)
defer cancel2()
// Wait for the wrapped ADS stream to be created.
var adsStream *wrappedADSStream
select {
case adsStream = <-adsStreamCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for ADS stream to be created")
}
// Configure the listener resource on the management server.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Ensure that there is a read on the stream.
select {
case <-adsStream.recvCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for ADS stream to be read from")
}
// Wait for the update to reach the watchers.
select {
case <-watcher1.updateCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for update to reach watcher 1")
}
select {
case <-watcher2.updateCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for update to reach watcher 2")
}
// Update the listener resource on the management server to point to a new
// route configuration resource.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, "new-route")},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Unblock one watcher.
done := <-watcher1.doneNotifierCh
done.OnDone()
// Wait for a short duration and ensure that there is no read on the stream.
select {
case <-adsStream.recvCh:
t.Fatal("Recv() called on the ADS stream before all watchers have processed the previous update")
case <-time.After(defaultTestShortTimeout):
}
// Unblock the second watcher.
done = <-watcher2.doneNotifierCh
done.OnDone()
// Ensure that there is a read on the stream, now that the previous update
// has been consumed by all watchers.
select {
case <-adsStream.recvCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream after all watchers have processed the previous update")
}
// Wait for the new update to reach the watchers.
select {
case <-watcher1.updateCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for update to reach watcher 1")
}
select {
case <-watcher2.updateCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for update to reach watcher 2")
}
// At this point, the xDS client is shut down (and the associated transport
// is closed) without the watchers invoking their respective onDone
// callbacks. This verifies that the closing a transport that has pending
// watchers does not block.
}
// Tests ADS stream level flow control with a multiple resources. The test does
// the following:
// - Starts a management server and configures two listener resources on it.
// - Creates an xDS client to the above management server, starts a couple of
// listener watchers for the two resources, and verifies that the update
// reaches these watchers.
// - These watchers don't invoke the onDone callback until explicitly
// triggered by the test. This allows the test to verify that the next
// Recv() call on the ADS stream does not happen until both watchers have
// completely processed the update, i.e invoke the onDone callback.
func (s) TestADSFlowControl_ResourceUpdates_MultipleResources(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Override the ADS stream creation.
adsStreamCh := overrideADSStreamCreation(t)
// Start an xDS management server.
const listenerResourceName1 = "test-listener-resource-1"
const listenerResourceName2 = "test-listener-resource-2"
wantResourceNames := []string{listenerResourceName1, listenerResourceName2}
requestCh := make(chan struct{}, 1)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
if req.GetTypeUrl() != version.V3ListenerURL {
return nil
}
gotResourceNames := req.GetResourceNames()
sort.Slice(gotResourceNames, func(i, j int) bool { return req.ResourceNames[i] < req.ResourceNames[j] })
if slices.Equal(gotResourceNames, wantResourceNames) {
// The two resource names will be part of the initial request
// and also the ACK. Hence, we need to make this write
// non-blocking.
select {
case requestCh <- struct{}{}:
default:
}
}
return nil
},
})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
testutils.CreateBootstrapFileForTesting(t, bc)
// Create an xDS client with the above bootstrap contents.
client := createXDSClient(t, bc)
// Configure two watchers for two different listener resources.
const routeConfigurationName1 = "test-route-configuration-resource-1"
watcher1 := newBLockingListenerWatcher()
cancel1 := xdsresource.WatchListener(client, listenerResourceName1, watcher1)
defer cancel1()
const routeConfigurationName2 = "test-route-configuration-resource-2"
watcher2 := newBLockingListenerWatcher()
cancel2 := xdsresource.WatchListener(client, listenerResourceName2, watcher2)
defer cancel2()
// Wait for the wrapped ADS stream to be created.
var adsStream *wrappedADSStream
select {
case adsStream = <-adsStreamCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for ADS stream to be created")
}
// Ensure that there is a read on the stream.
select {
case <-adsStream.recvCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for ADS stream to be read from")
}
// Wait for both resource names to be requested.
select {
case <-requestCh:
case <-ctx.Done():
t.Fatal("Timed out waiting for both resource names to be requested")
}
// Configure the listener resources on the management server.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{
e2e.DefaultClientListener(listenerResourceName1, routeConfigurationName1),
e2e.DefaultClientListener(listenerResourceName2, routeConfigurationName2),
},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// At this point, we expect the management server to send both resources in
// the same response. So, both watchers would be notified at the same time,
// and no more Recv() calls should happen until both of them have invoked
// their respective onDone() callbacks.
// The order of callback invocations among the two watchers is not
// guaranteed. So, we select on both of them and unblock the first watcher
// whose callback is invoked.
var otherWatcherUpdateCh chan struct{}
var otherWatcherDoneCh chan xdsresource.DoneNotifier
select {
case <-watcher1.updateCh:
done := <-watcher1.doneNotifierCh
done.OnDone()
otherWatcherUpdateCh = watcher2.updateCh
otherWatcherDoneCh = watcher2.doneNotifierCh
case <-watcher2.updateCh:
done := <-watcher2.doneNotifierCh
done.OnDone()
otherWatcherUpdateCh = watcher1.updateCh
otherWatcherDoneCh = watcher1.doneNotifierCh
case <-ctx.Done():
t.Fatal("Timed out waiting for update to reach first watchers")
}
// Wait for a short duration and ensure that there is no read on the stream.
select {
case <-adsStream.recvCh:
t.Fatal("Recv() called on the ADS stream before all watchers have processed the previous update")
case <-time.After(defaultTestShortTimeout):
}
// Wait for the update on the second watcher and unblock it.
select {
case <-otherWatcherUpdateCh:
done := <-otherWatcherDoneCh
done.OnDone()
case <-ctx.Done():
t.Fatal("Timed out waiting for update to reach second watcher")
}
// Ensure that there is a read on the stream, now that the previous update
// has been consumed by all watchers.
select {
case <-adsStream.recvCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream after all watchers have processed the previous update")
}
}
// Test ADS stream flow control with a single resource that is expected to be
// NACKed by the xDS client and the watcher's OnError() callback is expected to
// be invoked. Verifies that no further reads are attempted until the error is
// completely processed by the watcher.
func (s) TestADSFlowControl_ResourceErrors(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Override the ADS stream creation.
adsStreamCh := overrideADSStreamCreation(t)
// Start an xDS management server.
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
testutils.CreateBootstrapFileForTesting(t, bc)
// Create an xDS client with the above bootstrap contents.
client := createXDSClient(t, bc)
// Configure a watcher for a listener resource.
const listenerResourceName = "test-listener-resource"
watcher := newBLockingListenerWatcher()
cancel = xdsresource.WatchListener(client, listenerResourceName, watcher)
defer cancel()
// Wait for the wrapped ADS stream to be created.
var adsStream *wrappedADSStream
select {
case adsStream = <-adsStreamCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for ADS stream to be created")
}
// Configure the management server to return a single listener resource
// which is expected to be NACKed by the client.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{badListenerResource(t, listenerResourceName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Ensure that there is a read on the stream.
select {
case <-adsStream.recvCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for ADS stream to be read from")
}
// Wait for the error to reach the watcher.
select {
case <-watcher.errorCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for error to reach watcher")
}
// Wait for a short duration and ensure that there is no read on the stream.
select {
case <-adsStream.recvCh:
t.Fatal("Recv() called on the ADS stream before all watchers have processed the previous update")
case <-time.After(defaultTestShortTimeout):
}
// Unblock one watcher.
done := <-watcher.doneNotifierCh
done.OnDone()
// Ensure that there is a read on the stream, now that the previous error
// has been consumed by the watcher.
select {
case <-adsStream.recvCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream after all watchers have processed the previous update")
}
}
// Test ADS stream flow control with a single resource that deleted from the
// management server and therefore the watcher's OnResourceDoesNotExist()
// callback is expected to be invoked. Verifies that no further reads are
// attempted until the callback is completely handled by the watcher.
func (s) TestADSFlowControl_ResourceDoesNotExist(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Override the ADS stream creation.
adsStreamCh := overrideADSStreamCreation(t)
// Start an xDS management server.
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
testutils.CreateBootstrapFileForTesting(t, bc)
// Create an xDS client with the above bootstrap contents.
client := createXDSClient(t, bc)
// Configure a watcher for a listener resource.
const listenerResourceName = "test-listener-resource"
const routeConfigurationName = "test-route-configuration-resource"
watcher := newBLockingListenerWatcher()
cancel = xdsresource.WatchListener(client, listenerResourceName, watcher)
defer cancel()
// Wait for the wrapped ADS stream to be created.
var adsStream *wrappedADSStream
select {
case adsStream = <-adsStreamCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for ADS stream to be created")
}
// Configure the listener resource on the management server.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Ensure that there is a read on the stream.
select {
case <-adsStream.recvCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream")
}
// Wait for the update to reach the watcher and unblock it.
select {
case <-watcher.updateCh:
done := <-watcher.doneNotifierCh
done.OnDone()
case <-ctx.Done():
t.Fatalf("Timed out waiting for update to reach watcher 1")
}
// Ensure that there is a read on the stream.
select {
case <-adsStream.recvCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream")
}
// Remove the listener resource on the management server.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Wait for the resource not found callback to be invoked.
select {
case <-watcher.notFoundCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for resource not found callback to be invoked on the watcher")
}
// Wait for a short duration and ensure that there is no read on the stream.
select {
case <-adsStream.recvCh:
t.Fatal("Recv() called on the ADS stream before all watchers have processed the previous update")
case <-time.After(defaultTestShortTimeout):
}
// Unblock the watcher.
done := <-watcher.doneNotifierCh
done.OnDone()
// Ensure that there is a read on the stream.
select {
case <-adsStream.recvCh:
case <-ctx.Done():
t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream")
}
}

View File

@ -43,9 +43,15 @@ import (
type noopClusterWatcher struct{}
func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {}
func (noopClusterWatcher) OnError(err error) {}
func (noopClusterWatcher) OnResourceDoesNotExist() {}
func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, done xdsresource.DoneNotifier) {
done.OnDone()
}
func (noopClusterWatcher) OnError(err error, done xdsresource.DoneNotifier) {
done.OnDone()
}
func (noopClusterWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) {
done.OnDone()
}
type clusterUpdateErrTuple struct {
update xdsresource.ClusterUpdate
@ -60,20 +66,23 @@ func newClusterWatcher() *clusterWatcher {
return &clusterWatcher{updateCh: testutils.NewChannel()}
}
func (cw *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {
func (cw *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, done xdsresource.DoneNotifier) {
cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource})
done.OnDone()
}
func (cw *clusterWatcher) OnError(err error) {
func (cw *clusterWatcher) OnError(err error, done xdsresource.DoneNotifier) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
cw.updateCh.Replace(clusterUpdateErrTuple{err: err})
done.OnDone()
}
func (cw *clusterWatcher) OnResourceDoesNotExist() {
func (cw *clusterWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) {
cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")})
done.OnDone()
}
// badClusterResource returns a cluster resource for the given name which

View File

@ -53,9 +53,15 @@ const (
type noopEndpointsWatcher struct{}
func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {}
func (noopEndpointsWatcher) OnError(err error) {}
func (noopEndpointsWatcher) OnResourceDoesNotExist() {}
func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, done xdsresource.DoneNotifier) {
done.OnDone()
}
func (noopEndpointsWatcher) OnError(err error, done xdsresource.DoneNotifier) {
done.OnDone()
}
func (noopEndpointsWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) {
done.OnDone()
}
type endpointsUpdateErrTuple struct {
update xdsresource.EndpointsUpdate
@ -70,20 +76,23 @@ func newEndpointsWatcher() *endpointsWatcher {
return &endpointsWatcher{updateCh: testutils.NewChannel()}
}
func (ew *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {
func (ew *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, done xdsresource.DoneNotifier) {
ew.updateCh.Send(endpointsUpdateErrTuple{update: update.Resource})
done.OnDone()
}
func (ew *endpointsWatcher) OnError(err error) {
func (ew *endpointsWatcher) OnError(err error, done xdsresource.DoneNotifier) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
ew.updateCh.Replace(endpointsUpdateErrTuple{err: err})
done.OnDone()
}
func (ew *endpointsWatcher) OnResourceDoesNotExist() {
func (ew *endpointsWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) {
ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")})
done.OnDone()
}
// badEndpointsResource returns a endpoints resource for the given

View File

@ -48,9 +48,15 @@ import (
type noopListenerWatcher struct{}
func (noopListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {}
func (noopListenerWatcher) OnError(err error) {}
func (noopListenerWatcher) OnResourceDoesNotExist() {}
func (noopListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, done xdsresource.DoneNotifier) {
done.OnDone()
}
func (noopListenerWatcher) OnError(err error, done xdsresource.DoneNotifier) {
done.OnDone()
}
func (noopListenerWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) {
done.OnDone()
}
type listenerUpdateErrTuple struct {
update xdsresource.ListenerUpdate
@ -65,20 +71,23 @@ func newListenerWatcher() *listenerWatcher {
return &listenerWatcher{updateCh: testutils.NewChannel()}
}
func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, done xdsresource.DoneNotifier) {
cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
done.OnDone()
}
func (cw *listenerWatcher) OnError(err error) {
func (cw *listenerWatcher) OnError(err error, done xdsresource.DoneNotifier) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
cw.updateCh.Replace(listenerUpdateErrTuple{err: err})
done.OnDone()
}
func (cw *listenerWatcher) OnResourceDoesNotExist() {
func (cw *listenerWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) {
cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
done.OnDone()
}
// badListenerResource returns a listener resource for the given name which does

View File

@ -69,23 +69,26 @@ func newTestRouteConfigWatcher(client xdsclient.XDSClient, name1, name2 string)
}
}
func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, done xdsresource.DoneNotifier) {
rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource})
rw.cancel1 = xdsresource.WatchRouteConfig(rw.client, rw.name1, rw.rcw1)
rw.cancel2 = xdsresource.WatchRouteConfig(rw.client, rw.name2, rw.rcw2)
done.OnDone()
}
func (rw *testRouteConfigWatcher) OnError(err error) {
func (rw *testRouteConfigWatcher) OnError(err error, done xdsresource.DoneNotifier) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err})
done.OnDone()
}
func (rw *testRouteConfigWatcher) OnResourceDoesNotExist() {
func (rw *testRouteConfigWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) {
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
done.OnDone()
}
func (rw *testRouteConfigWatcher) cancel() {

View File

@ -43,9 +43,15 @@ import (
type noopRouteConfigWatcher struct{}
func (noopRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {}
func (noopRouteConfigWatcher) OnError(err error) {}
func (noopRouteConfigWatcher) OnResourceDoesNotExist() {}
func (noopRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, done xdsresource.DoneNotifier) {
done.OnDone()
}
func (noopRouteConfigWatcher) OnError(err error, done xdsresource.DoneNotifier) {
done.OnDone()
}
func (noopRouteConfigWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) {
done.OnDone()
}
type routeConfigUpdateErrTuple struct {
update xdsresource.RouteConfigUpdate
@ -60,20 +66,23 @@ func newRouteConfigWatcher() *routeConfigWatcher {
return &routeConfigWatcher{updateCh: testutils.NewChannel()}
}
func (rw *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
func (rw *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, done xdsresource.DoneNotifier) {
rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource})
done.OnDone()
}
func (rw *routeConfigWatcher) OnError(err error) {
func (rw *routeConfigWatcher) OnError(err error, done xdsresource.DoneNotifier) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err})
done.OnDone()
}
func (rw *routeConfigWatcher) OnResourceDoesNotExist() {
func (rw *routeConfigWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) {
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
done.OnDone()
}
// badRouteConfigResource returns a RouteConfiguration resource for the given

View File

@ -68,7 +68,7 @@ func (s) TestReportLoad(t *testing.T) {
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
NodeProto: nodeProto,
OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, // No ADS validation.
OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, // No ADS validation.
OnErrorHandler: func(error) {}, // No ADS stream error handling.
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No ADS stream update handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.

View File

@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc"
@ -35,8 +36,9 @@ import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/keepalive"
xdsclientinternal "google.golang.org/grpc/xds/internal/xdsclient/internal"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/transport/internal"
transportinternal "google.golang.org/grpc/xds/internal/xdsclient/transport/internal"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/types/known/anypb"
@ -46,13 +48,20 @@ import (
statuspb "google.golang.org/genproto/googleapis/rpc/status"
)
type adsStream = v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
func init() {
transportinternal.GRPCNewClient = grpc.NewClient
xdsclientinternal.NewADSStream = func(ctx context.Context, cc *grpc.ClientConn) (adsStream, error) {
return v3adsgrpc.NewAggregatedDiscoveryServiceClient(cc).StreamAggregatedResources(ctx)
}
}
// Any per-RPC level logs which print complete request or response messages
// should be gated at this verbosity level. Other per-RPC level logs which print
// terse output should be at `INFO` and verbosity 2.
const perRPCVerbosityLevel = 9
type adsStream = v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// Transport provides a resource-type agnostic implementation of the xDS
// transport protocol. At this layer, resource contents are supposed to be
// opaque blobs which should be be meaningful only to the xDS data model layer
@ -112,7 +121,11 @@ type Transport struct {
// cause the transport layer to send an ACK to the management server. A non-nil
// error is returned from this function when the data model layer believes
// otherwise, and this will cause the transport layer to send a NACK.
type OnRecvHandlerFunc func(update ResourceUpdate) error
//
// The implementation is expected to use the ADS flow control object passed to
// it, and increment the number of watchers to whom the update is sent to, and
// eventually decrement the number once the update is consumed by the watchers.
type OnRecvHandlerFunc func(update ResourceUpdate, fc *ADSFlowControl) error
// OnSendHandlerFunc is the implementation at the authority, which handles state
// changes for the resource watch and stop watch timers accordingly.
@ -169,10 +182,6 @@ type Options struct {
NodeProto *v3corepb.Node
}
func init() {
internal.GRPCNewClient = grpc.NewClient
}
// New creates a new Transport.
func New(opts Options) (*Transport, error) {
switch {
@ -194,7 +203,7 @@ func New(opts Options) (*Transport, error) {
Timeout: 20 * time.Second,
}),
}
grpcNewClient := internal.GRPCNewClient.(func(string, ...grpc.DialOption) (*grpc.ClientConn, error))
grpcNewClient := transportinternal.GRPCNewClient.(func(string, ...grpc.DialOption) (*grpc.ClientConn, error))
cc, err := grpcNewClient(opts.ServerCfg.ServerURI(), dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
@ -262,12 +271,6 @@ func (t *Transport) SendRequest(url string, resources []string) {
})
}
func (t *Transport) newAggregatedDiscoveryServiceStream(ctx context.Context, cc *grpc.ClientConn) (adsStream, error) {
// The transport retries the stream with an exponential backoff whenever the
// stream breaks without ever having seen a response.
return v3adsgrpc.NewAggregatedDiscoveryServiceClient(cc).StreamAggregatedResources(ctx)
}
// ResourceSendInfo wraps the names and url of resources sent to the management
// server. This is used by the `authority` type to start/stop the watch timer
// associated with every resource in the update.
@ -329,7 +332,8 @@ func (t *Transport) adsRunner(ctx context.Context) {
// We reset backoff state when we successfully receive at least one
// message from the server.
runStreamWithBackoff := func() error {
stream, err := t.newAggregatedDiscoveryServiceStream(ctx, t.cc)
newStream := xdsclientinternal.NewADSStream.(func(context.Context, *grpc.ClientConn) (adsStream, error))
stream, err := newStream(ctx, t.cc)
if err != nil {
t.onErrorHandler(err)
t.logger.Warningf("Creating new ADS stream failed: %v", err)
@ -342,7 +346,7 @@ func (t *Transport) adsRunner(ctx context.Context) {
default:
}
t.adsStreamCh <- stream
msgReceived := t.recv(stream)
msgReceived := t.recv(ctx, stream)
if msgReceived {
return backoff.ErrResetBackoff
}
@ -462,9 +466,21 @@ func (t *Transport) sendExisting(stream adsStream) (sentNodeProto bool, err erro
// recv receives xDS responses on the provided ADS stream and branches out to
// message specific handlers. Returns true if at least one message was
// successfully received.
func (t *Transport) recv(stream adsStream) bool {
func (t *Transport) recv(ctx context.Context, stream adsStream) bool {
// Initialize the flow control quota for the stream. This helps to block the
// next read until the previous one is consumed by all watchers.
fc := NewADSStreamFlowControl()
msgReceived := false
for {
// Wait for ADS stream level flow control to be available.
if !fc.Wait(ctx) {
if t.logger.V(2) {
t.logger.Infof("ADS stream context canceled")
}
return msgReceived
}
resources, url, rVersion, nonce, err := t.recvAggregatedDiscoveryServiceResponse(stream)
if err != nil {
// Note that we do not consider it an error if the ADS stream was closed
@ -482,12 +498,12 @@ func (t *Transport) recv(stream adsStream) bool {
}
msgReceived = true
err = t.onRecvHandler(ResourceUpdate{
u := ResourceUpdate{
Resources: resources,
URL: url,
Version: rVersion,
})
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceTypeUnsupported {
}
if err = t.onRecvHandler(u, fc); xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceTypeUnsupported {
t.logger.Warningf("%v", err)
continue
}
@ -621,3 +637,73 @@ func (t *Transport) Close() {
func (t *Transport) ChannelConnectivityStateForTesting() connectivity.State {
return t.cc.GetState()
}
// ADSFlowControl implements ADS stream level flow control that enables the
// transport to block the reading of the next message off of the stream until
// the previous update is consumed by all watchers.
//
// The lifetime of the flow control is tied to the lifetime of the stream.
//
// New instances must be created with a call to NewADSStreamFlowControl.
type ADSFlowControl struct {
logger *grpclog.PrefixLogger
// Count of watchers yet to consume the most recent update.
pending atomic.Int64
// Channel used to notify when all the watchers have consumed the most
// recent update. Wait() blocks on reading a value from this channel.
readyCh chan struct{}
}
// NewADSStreamFlowControl returns a new ADSFlowControl.
func NewADSStreamFlowControl() *ADSFlowControl {
return &ADSFlowControl{readyCh: make(chan struct{}, 1)}
}
// Add increments the number of watchers (by one) who are yet to consume the
// most recent update received on the ADS stream.
func (fc *ADSFlowControl) Add() {
fc.pending.Add(1)
}
// Wait blocks until all the watchers have consumed the most recent update and
// returns true. If the context expires before that, it returns false.
func (fc *ADSFlowControl) Wait(ctx context.Context) bool {
// If there are no watchers or none with pending updates, there is no need
// to block.
if n := fc.pending.Load(); n == 0 {
// If all watchers finished processing the most recent update before the
// `recv` goroutine made the next call to `Wait()`, there would be an
// entry in the readyCh channel that needs to be drained to ensure that
// the next call to `Wait()` doesn't unblock before it actually should.
select {
case <-fc.readyCh:
default:
}
return true
}
select {
case <-ctx.Done():
return false
case <-fc.readyCh:
return true
}
}
// OnDone indicates that a watcher has consumed the most recent update.
func (fc *ADSFlowControl) OnDone() {
if pending := fc.pending.Add(-1); pending != 0 {
return
}
select {
// Writes to the readyCh channel should not block ideally. The default
// branch here is to appease the paranoid mind.
case fc.readyCh <- struct{}{}:
default:
if fc.logger.V(2) {
fc.logger.Infof("ADS stream flow control readyCh is full")
}
}
}

View File

@ -49,7 +49,7 @@ var (
// A simple update handler for listener resources which validates only the
// `use_original_dst` field.
dataModelValidator = func(update transport.ResourceUpdate) error {
dataModelValidator = func(update transport.ResourceUpdate, _ *transport.ADSFlowControl) error {
for _, r := range update.Resources {
inner := &v3discoverypb.Resource{}
if err := proto.Unmarshal(r.GetValue(), inner); err != nil {

View File

@ -101,7 +101,7 @@ func (s) TestTransport_BackoffAfterStreamFailure(t *testing.T) {
nodeID := uuid.New().String()
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, // No data model layer validation.
OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, // No data model layer validation.
OnErrorHandler: func(err error) {
select {
case streamErrCh <- err:
@ -262,7 +262,7 @@ func (s) TestTransport_RetriesAfterBrokenStream(t *testing.T) {
// we can pass a no-op data model layer implementation.
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, // No data model layer validation.
OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, // No data model layer validation.
OnErrorHandler: func(err error) {
select {
case streamErrCh <- err:
@ -394,7 +394,7 @@ func (s) TestTransport_ResourceRequestedBeforeStreamCreation(t *testing.T) {
nodeID := uuid.New().String()
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, // No data model layer validation.
OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil }, // No data model layer validation.
OnErrorHandler: func(error) {}, // No stream error handling.
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No on send handler
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.

View File

@ -53,7 +53,7 @@ func (s) TestNew(t *testing.T) {
opts: transport.Options{
ServerCfg: serverCfg,
NodeProto: &v3corepb.Node{},
OnRecvHandler: func(transport.ResourceUpdate) error { return nil },
OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil },
OnSendHandler: func(*transport.ResourceSendInfo) {},
},
wantErrStr: "missing OnError callback handler when creating a new transport",
@ -64,7 +64,7 @@ func (s) TestNew(t *testing.T) {
opts: transport.Options{
ServerCfg: serverCfg,
NodeProto: &v3corepb.Node{},
OnRecvHandler: func(transport.ResourceUpdate) error { return nil },
OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil },
OnErrorHandler: func(error) {},
},
wantErrStr: "missing OnSend callback handler when creating a new transport",
@ -74,7 +74,7 @@ func (s) TestNew(t *testing.T) {
opts: transport.Options{
ServerCfg: serverCfg,
NodeProto: &v3corepb.Node{},
OnRecvHandler: func(transport.ResourceUpdate) error { return nil },
OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil },
OnErrorHandler: func(error) {},
OnSendHandler: func(*transport.ResourceSendInfo) {},
},

View File

@ -185,7 +185,7 @@ func (s) TestHandleResponseFromManagementServer(t *testing.T) {
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
// No validation. Simply push received resources on a channel.
OnRecvHandler: func(update transport.ResourceUpdate) error {
OnRecvHandler: func(update transport.ResourceUpdate, _ *transport.ADSFlowControl) error {
resourcesCh.Send(&resourcesWithTypeURL{
resources: update.Resources,
url: update.URL,
@ -239,9 +239,7 @@ func (s) TestEmptyListenerResourceOnStreamRestart(t *testing.T) {
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
OnRecvHandler: func(update transport.ResourceUpdate) error {
return nil
},
OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil },
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
OnErrorHandler: func(error) {}, // No stream error handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
@ -332,9 +330,7 @@ func (s) TestEmptyClusterResourceOnStreamRestartWithListener(t *testing.T) {
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
OnRecvHandler: func(update transport.ResourceUpdate) error {
return nil
},
OnRecvHandler: func(transport.ResourceUpdate, *transport.ADSFlowControl) error { return nil },
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
OnErrorHandler: func(error) {}, // No stream error handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.

View File

@ -47,7 +47,10 @@ func (s) TestNewWithGRPCDial(t *testing.T) {
opts := transport.Options{
ServerCfg: serverCfg,
NodeProto: &v3corepb.Node{},
OnRecvHandler: func(transport.ResourceUpdate) error { return nil },
OnRecvHandler: func(update transport.ResourceUpdate, fc *transport.ADSFlowControl) error {
fc.OnDone()
return nil
},
OnErrorHandler: func(error) {},
OnSendHandler: func(*transport.ResourceSendInfo) {},
}

View File

@ -111,7 +111,10 @@ func (c *ClusterResourceData) Raw() *anypb.Any {
// corresponding to the cluster resource being watched.
type ClusterWatcher interface {
// OnUpdate is invoked to report an update for the resource being watched.
OnUpdate(*ClusterResourceData)
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnUpdate(*ClusterResourceData, DoneNotifier)
// OnError is invoked under different error conditions including but not
// limited to the following:
@ -121,28 +124,34 @@ type ClusterWatcher interface {
// - resource validation error
// - ADS stream failure
// - connection failure
OnError(error)
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnError(error, DoneNotifier)
// OnResourceDoesNotExist is invoked for a specific error condition where
// the requested resource is not found on the xDS management server.
OnResourceDoesNotExist()
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnResourceDoesNotExist(DoneNotifier)
}
type delegatingClusterWatcher struct {
watcher ClusterWatcher
}
func (d *delegatingClusterWatcher) OnUpdate(data ResourceData) {
func (d *delegatingClusterWatcher) OnUpdate(data ResourceData, done DoneNotifier) {
c := data.(*ClusterResourceData)
d.watcher.OnUpdate(c)
d.watcher.OnUpdate(c, done)
}
func (d *delegatingClusterWatcher) OnError(err error) {
d.watcher.OnError(err)
func (d *delegatingClusterWatcher) OnError(err error, done DoneNotifier) {
d.watcher.OnError(err, done)
}
func (d *delegatingClusterWatcher) OnResourceDoesNotExist() {
d.watcher.OnResourceDoesNotExist()
func (d *delegatingClusterWatcher) OnResourceDoesNotExist(done DoneNotifier) {
d.watcher.OnResourceDoesNotExist(done)
}
// WatchCluster uses xDS to discover the configuration associated with the

View File

@ -107,7 +107,10 @@ func (e *EndpointsResourceData) Raw() *anypb.Any {
// events corresponding to the endpoints resource being watched.
type EndpointsWatcher interface {
// OnUpdate is invoked to report an update for the resource being watched.
OnUpdate(*EndpointsResourceData)
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnUpdate(*EndpointsResourceData, DoneNotifier)
// OnError is invoked under different error conditions including but not
// limited to the following:
@ -117,28 +120,34 @@ type EndpointsWatcher interface {
// - resource validation error
// - ADS stream failure
// - connection failure
OnError(error)
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnError(error, DoneNotifier)
// OnResourceDoesNotExist is invoked for a specific error condition where
// the requested resource is not found on the xDS management server.
OnResourceDoesNotExist()
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnResourceDoesNotExist(DoneNotifier)
}
type delegatingEndpointsWatcher struct {
watcher EndpointsWatcher
}
func (d *delegatingEndpointsWatcher) OnUpdate(data ResourceData) {
func (d *delegatingEndpointsWatcher) OnUpdate(data ResourceData, done DoneNotifier) {
e := data.(*EndpointsResourceData)
d.watcher.OnUpdate(e)
d.watcher.OnUpdate(e, done)
}
func (d *delegatingEndpointsWatcher) OnError(err error) {
d.watcher.OnError(err)
func (d *delegatingEndpointsWatcher) OnError(err error, done DoneNotifier) {
d.watcher.OnError(err, done)
}
func (d *delegatingEndpointsWatcher) OnResourceDoesNotExist() {
d.watcher.OnResourceDoesNotExist()
func (d *delegatingEndpointsWatcher) OnResourceDoesNotExist(done DoneNotifier) {
d.watcher.OnResourceDoesNotExist(done)
}
// WatchEndpoints uses xDS to discover the configuration associated with the

View File

@ -144,7 +144,10 @@ func (l *ListenerResourceData) Raw() *anypb.Any {
// events corresponding to the listener resource being watched.
type ListenerWatcher interface {
// OnUpdate is invoked to report an update for the resource being watched.
OnUpdate(*ListenerResourceData)
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnUpdate(*ListenerResourceData, DoneNotifier)
// OnError is invoked under different error conditions including but not
// limited to the following:
@ -154,28 +157,34 @@ type ListenerWatcher interface {
// - resource validation error
// - ADS stream failure
// - connection failure
OnError(error)
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnError(error, DoneNotifier)
// OnResourceDoesNotExist is invoked for a specific error condition where
// the requested resource is not found on the xDS management server.
OnResourceDoesNotExist()
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnResourceDoesNotExist(DoneNotifier)
}
type delegatingListenerWatcher struct {
watcher ListenerWatcher
}
func (d *delegatingListenerWatcher) OnUpdate(data ResourceData) {
func (d *delegatingListenerWatcher) OnUpdate(data ResourceData, done DoneNotifier) {
l := data.(*ListenerResourceData)
d.watcher.OnUpdate(l)
d.watcher.OnUpdate(l, done)
}
func (d *delegatingListenerWatcher) OnError(err error) {
d.watcher.OnError(err)
func (d *delegatingListenerWatcher) OnError(err error, done DoneNotifier) {
d.watcher.OnError(err, done)
}
func (d *delegatingListenerWatcher) OnResourceDoesNotExist() {
d.watcher.OnResourceDoesNotExist()
func (d *delegatingListenerWatcher) OnResourceDoesNotExist(done DoneNotifier) {
d.watcher.OnResourceDoesNotExist(done)
}
// WatchListener uses xDS to discover the configuration associated with the

View File

@ -52,13 +52,29 @@ type Producer interface {
WatchResource(rType Type, resourceName string, watcher ResourceWatcher) (cancel func())
}
// DoneNotifier wraps the OnDone callback to be invoked once a resource update
// is processed by the watcher.
type DoneNotifier interface {
OnDone()
}
// NopDoneNotifier is a concrete implementation of the DoneNotifier interface,
// that serves as a convenient placeholder when the callback is not needed.
type NopDoneNotifier struct{}
// OnDone implements the DoneNotifier interface.
func (NopDoneNotifier) OnDone() {}
// ResourceWatcher wraps the callbacks to be invoked for different events
// corresponding to the resource being watched.
type ResourceWatcher interface {
// OnUpdate is invoked to report an update for the resource being watched.
// The ResourceData parameter needs to be type asserted to the appropriate
// type for the resource being watched.
OnUpdate(ResourceData)
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnUpdate(ResourceData, DoneNotifier)
// OnError is invoked under different error conditions including but not
// limited to the following:
@ -68,11 +84,11 @@ type ResourceWatcher interface {
// - resource validation error
// - ADS stream failure
// - connection failure
OnError(error)
OnError(error, DoneNotifier)
// OnResourceDoesNotExist is invoked for a specific error condition where
// the requested resource is not found on the xDS management server.
OnResourceDoesNotExist()
OnResourceDoesNotExist(DoneNotifier)
}
// TODO: Once the implementation is complete, rename this interface as

View File

@ -108,7 +108,10 @@ func (r *RouteConfigResourceData) Raw() *anypb.Any {
// events corresponding to the route configuration resource being watched.
type RouteConfigWatcher interface {
// OnUpdate is invoked to report an update for the resource being watched.
OnUpdate(*RouteConfigResourceData)
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnUpdate(*RouteConfigResourceData, DoneNotifier)
// OnError is invoked under different error conditions including but not
// limited to the following:
@ -118,28 +121,34 @@ type RouteConfigWatcher interface {
// - resource validation error
// - ADS stream failure
// - connection failure
OnError(error)
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnError(error, DoneNotifier)
// OnResourceDoesNotExist is invoked for a specific error condition where
// the requested resource is not found on the xDS management server.
OnResourceDoesNotExist()
//
// The watcher is expected to call Done() on the DoneNotifier once it has
// processed the update.
OnResourceDoesNotExist(DoneNotifier)
}
type delegatingRouteConfigWatcher struct {
watcher RouteConfigWatcher
}
func (d *delegatingRouteConfigWatcher) OnUpdate(data ResourceData) {
func (d *delegatingRouteConfigWatcher) OnUpdate(data ResourceData, done DoneNotifier) {
rc := data.(*RouteConfigResourceData)
d.watcher.OnUpdate(rc)
d.watcher.OnUpdate(rc, done)
}
func (d *delegatingRouteConfigWatcher) OnError(err error) {
d.watcher.OnError(err)
func (d *delegatingRouteConfigWatcher) OnError(err error, done DoneNotifier) {
d.watcher.OnError(err, done)
}
func (d *delegatingRouteConfigWatcher) OnResourceDoesNotExist() {
d.watcher.OnResourceDoesNotExist()
func (d *delegatingRouteConfigWatcher) OnResourceDoesNotExist(done DoneNotifier) {
d.watcher.OnResourceDoesNotExist(done)
}
// WatchRouteConfig uses xDS to discover the configuration associated with the