xds/internal/resolver: switch to generic xDS API for LDS/RDS (#6729)

This commit is contained in:
Easwar Swaminathan 2023-12-07 14:39:06 -08:00 committed by GitHub
parent a03c7f1faa
commit 477bd62419
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 449 additions and 402 deletions

View File

@ -188,7 +188,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) {
target = fmt.Sprintf("xds://unknown-authority/%s", serviceName)
t.Logf("Dialing target %q with unknown authority which is expected to fail", target)
const wantErr = `authority "unknown-authority" is not found in the bootstrap file`
wantErr := fmt.Sprintf("authority \"unknown-authority\" specified in dial target %q is not found in the bootstrap file", target)
_, err = grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err == nil || !strings.Contains(err.Error(), wantErr) {
t.Fatalf("grpc.Dial(%q) returned %v, want: %s", target, err, wantErr)

View File

@ -39,7 +39,6 @@ import (
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/httpfilter"
rinternal "google.golang.org/grpc/xds/internal/resolver/internal"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
@ -72,16 +71,6 @@ type xdsClusterManagerConfig struct {
Children map[string]xdsChildConfig `json:"children"`
}
// pruneActiveClusters deletes entries in r.activeClusters with zero
// references.
func (r *xdsResolver) pruneActiveClusters() {
for cluster, ci := range r.activeClusters {
if atomic.LoadInt32(&ci.refCount) == 0 {
delete(r.activeClusters, cluster)
}
}
}
// serviceConfigJSON produces a service config in JSON format representing all
// the clusters referenced in activeClusters. This includes clusters with zero
// references, so they must be pruned first.
@ -193,10 +182,9 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
if v := atomic.AddInt32(ref, -1); v == 0 {
// This entry will be removed from activeClusters when
// producing the service config for the empty update.
select {
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
default:
}
cs.r.serializer.Schedule(func(context.Context) {
cs.r.onClusterRefDownToZero()
})
}
},
Interceptor: interceptor,
@ -338,99 +326,12 @@ func (cs *configSelector) stop() {
// selector; we need another update to delete clusters from the config (if
// we don't have another update pending already).
if needUpdate {
select {
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
default:
}
cs.r.serializer.Schedule(func(context.Context) {
cs.r.onClusterRefDownToZero()
})
}
}
// newConfigSelector creates the config selector for su; may add entries to
// r.activeClusters for previously-unseen clusters.
func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
cs := &configSelector{
r: r,
virtualHost: virtualHost{
httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride,
retryConfig: su.virtualHost.RetryConfig,
},
routes: make([]route, len(su.virtualHost.Routes)),
clusters: make(map[string]*clusterInfo),
httpFilterConfig: su.ldsConfig.httpFilterConfig,
}
for i, rt := range su.virtualHost.Routes {
clusters := rinternal.NewWRR.(func() wrr.WRR)()
if rt.ClusterSpecifierPlugin != "" {
clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
clusters.Add(&routeCluster{
name: clusterName,
}, 1)
cs.initializeCluster(clusterName, xdsChildConfig{
ChildPolicy: balancerConfig(su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin]),
})
} else {
for cluster, wc := range rt.WeightedClusters {
clusterName := clusterPrefix + cluster
clusters.Add(&routeCluster{
name: clusterName,
httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
}, int64(wc.Weight))
cs.initializeCluster(clusterName, xdsChildConfig{
ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}),
})
}
}
cs.routes[i].clusters = clusters
var err error
cs.routes[i].m, err = xdsresource.RouteToMatcher(rt)
if err != nil {
return nil, err
}
cs.routes[i].actionType = rt.ActionType
if rt.MaxStreamDuration == nil {
cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration
} else {
cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
}
cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
cs.routes[i].retryConfig = rt.RetryConfig
cs.routes[i].hashPolicies = rt.HashPolicies
}
// Account for this config selector's clusters. Do this after no further
// errors may occur. Note: cs.clusters are pointers to entries in
// activeClusters.
for _, ci := range cs.clusters {
atomic.AddInt32(&ci.refCount, 1)
}
return cs, nil
}
// initializeCluster initializes entries in cs.clusters map, creating entries in
// r.activeClusters as necessary. Any created entries will have a ref count set
// to zero as their ref count will be incremented by incRefs.
func (cs *configSelector) initializeCluster(clusterName string, cfg xdsChildConfig) {
ci := cs.r.activeClusters[clusterName]
if ci == nil {
ci = &clusterInfo{refCount: 0}
cs.r.activeClusters[clusterName] = ci
}
cs.clusters[clusterName] = ci
cs.clusters[clusterName].cfg = cfg
}
type clusterInfo struct {
// number of references to this cluster; accessed atomically
refCount int32
// cfg is the child configuration for this cluster, containing either the
// csp config or the cds cluster config.
cfg xdsChildConfig
}
type interceptorList struct {
interceptors []iresolver.ClientInterceptor
}

View File

@ -19,185 +19,77 @@
package resolver
import (
"fmt"
"sync"
"time"
"context"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/xds/internal/clusterspecifier"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
// serviceUpdate contains information received from the LDS/RDS responses which
// are of interest to the xds resolver. The RDS request is built by first
// making a LDS to get the RouteConfig name.
type serviceUpdate struct {
// virtualHost contains routes and other configuration to route RPCs.
virtualHost *xdsresource.VirtualHost
// clusterSpecifierPlugins contains the configurations for any cluster
// specifier plugins emitted by the xdsclient.
clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig
// ldsConfig contains configuration that applies to all routes.
ldsConfig ldsConfig
type listenerWatcher struct {
resourceName string
cancel func()
parent *xdsResolver
}
// ldsConfig contains information received from the LDS responses which are of
// interest to the xds resolver.
type ldsConfig struct {
// maxStreamDuration is from the HTTP connection manager's
// common_http_protocol_options field.
maxStreamDuration time.Duration
httpFilterConfig []xdsresource.HTTPFilter
func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatcher {
lw := &listenerWatcher{resourceName: resourceName, parent: parent}
lw.cancel = xdsresource.WatchListener(parent.xdsClient, resourceName, lw)
return lw
}
// watchService uses LDS and RDS to discover information about the provided
// serviceName.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
//
// TODO(easwars): Make this function a method on the xdsResolver type.
// Currently, there is a single call site for this function, and all arguments
// passed to it are fields of the xdsResolver type.
func watchService(c xdsclient.XDSClient, serviceName string, cb func(serviceUpdate, error), logger *grpclog.PrefixLogger) (cancel func()) {
w := &serviceUpdateWatcher{
logger: logger,
c: c,
serviceName: serviceName,
serviceCb: cb,
}
w.ldsCancel = c.WatchListener(serviceName, w.handleLDSResp)
return w.close
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.onListenerResourceUpdate(update.Resource)
})
}
// serviceUpdateWatcher handles LDS and RDS response, and calls the service
// callback at the right time.
type serviceUpdateWatcher struct {
logger *grpclog.PrefixLogger
c xdsclient.XDSClient
serviceName string
ldsCancel func()
serviceCb func(serviceUpdate, error)
lastUpdate serviceUpdate
mu sync.Mutex
closed bool
rdsName string
rdsCancel func()
func (l *listenerWatcher) OnError(err error) {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.onListenerResourceError(err)
})
}
func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate, err error) {
w.logger.Infof("received LDS update: %+v, err: %v", pretty.ToJSON(update), err)
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return
}
if err != nil {
// We check the error type and do different things. For now, the only
// type we check is ResourceNotFound, which indicates the LDS resource
// was removed, and besides sending the error to callback, we also
// cancel the RDS watch.
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound && w.rdsCancel != nil {
w.rdsCancel()
w.rdsName = ""
w.rdsCancel = nil
w.lastUpdate = serviceUpdate{}
}
// The other error cases still return early without canceling the
// existing RDS watch.
w.serviceCb(serviceUpdate{}, err)
return
}
w.lastUpdate.ldsConfig = ldsConfig{
maxStreamDuration: update.MaxStreamDuration,
httpFilterConfig: update.HTTPFilters,
}
if update.InlineRouteConfig != nil {
// If there was an RDS watch, cancel it.
w.rdsName = ""
if w.rdsCancel != nil {
w.rdsCancel()
w.rdsCancel = nil
}
// Handle the inline RDS update as if it's from an RDS watch.
w.applyRouteConfigUpdate(*update.InlineRouteConfig)
return
}
// RDS name from update is not an empty string, need RDS to fetch the
// routes.
if w.rdsName == update.RouteConfigName {
// If the new RouteConfigName is same as the previous, don't cancel and
// restart the RDS watch.
//
// If the route name did change, then we must wait until the first RDS
// update before reporting this LDS config.
if w.lastUpdate.virtualHost != nil {
// We want to send an update with the new fields from the new LDS
// (e.g. max stream duration), and old fields from the previous
// RDS.
//
// But note that this should only happen when virtual host is set,
// which means an RDS was received.
w.serviceCb(w.lastUpdate, nil)
}
return
}
w.rdsName = update.RouteConfigName
if w.rdsCancel != nil {
w.rdsCancel()
}
w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp)
func (l *listenerWatcher) OnResourceDoesNotExist() {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.onListenerResourceNotFound()
})
}
func (w *serviceUpdateWatcher) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) {
matchVh := xdsresource.FindBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
if matchVh == nil {
// No matching virtual host found.
w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
return
}
w.lastUpdate.virtualHost = matchVh
w.lastUpdate.clusterSpecifierPlugins = update.ClusterSpecifierPlugins
w.serviceCb(w.lastUpdate, nil)
func (l *listenerWatcher) stop() {
l.cancel()
l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
}
func (w *serviceUpdateWatcher) handleRDSResp(update xdsresource.RouteConfigUpdate, err error) {
w.logger.Infof("received RDS update: %+v, err: %v", pretty.ToJSON(update), err)
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return
}
if w.rdsCancel == nil {
// This mean only the RDS watch is canceled, can happen if the LDS
// resource is removed.
return
}
if err != nil {
w.serviceCb(serviceUpdate{}, err)
return
}
w.applyRouteConfigUpdate(update)
type routeConfigWatcher struct {
resourceName string
cancel func()
parent *xdsResolver
}
func (w *serviceUpdateWatcher) close() {
w.mu.Lock()
defer w.mu.Unlock()
w.closed = true
w.ldsCancel()
if w.rdsCancel != nil {
w.rdsCancel()
w.rdsCancel = nil
}
func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfigWatcher {
rw := &routeConfigWatcher{resourceName: resourceName, parent: parent}
rw.cancel = xdsresource.WatchRouteConfig(parent.xdsClient, resourceName, rw)
return rw
}
func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource)
})
}
func (r *routeConfigWatcher) OnError(err error) {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.onRouteConfigResourceError(r.resourceName, err)
})
}
func (r *routeConfigWatcher) OnResourceDoesNotExist() {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.onRouteConfigResourceNotFound(r.resourceName)
})
}
func (r *routeConfigWatcher) stop() {
r.cancel()
r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)
}

View File

@ -20,9 +20,10 @@
package resolver
import (
"errors"
"context"
"fmt"
"strings"
"sync/atomic"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
@ -75,8 +76,6 @@ type xdsResolverBuilder struct {
func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (_ resolver.Resolver, retErr error) {
r := &xdsResolver{
cc: cc,
closed: grpcsync.NewEvent(),
updateCh: make(chan suWithError, 1),
activeClusters: make(map[string]*clusterInfo),
channelID: grpcrand.Uint64(),
}
@ -88,26 +87,65 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon
r.logger = prefixLogger(r)
r.logger.Infof("Creating resolver for target: %+v", target)
// Initialize the serializer used to synchronize the following:
// - updates from the xDS client. This could lead to generation of new
// service config if resolution is complete.
// - completion of an RPC to a removed cluster causing the associated ref
// count to become zero, resulting in generation of new service config.
// - stopping of a config selector that results in generation of new service
// config.
ctx, cancel := context.WithCancel(context.Background())
r.serializer = grpcsync.NewCallbackSerializer(ctx)
r.serializerCancel = cancel
// Initialize the xDS client.
newXDSClient := rinternal.NewXDSClient.(func() (xdsclient.XDSClient, func(), error))
if b.newXDSClient != nil {
newXDSClient = b.newXDSClient
}
client, close, err := newXDSClient()
if err != nil {
return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
}
r.xdsClient = client
r.xdsClientClose = close
// Determine the listener resource name and start a watcher for it.
template, err := r.sanityChecksOnBootstrapConfig(target, opts, r.xdsClient)
if err != nil {
return nil, err
}
endpoint := target.URL.Path
if endpoint == "" {
endpoint = target.URL.Opaque
}
endpoint = strings.TrimPrefix(endpoint, "/")
r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, endpoint)
r.listenerWatcher = newListenerWatcher(r.ldsResourceName, r)
return r, nil
}
// Performs the following sanity checks:
// - Verifies that the bootstrap configuration is not empty.
// - Verifies that if xDS credentials are specified by the user, the
// bootstrap configuration contains certificate providers.
// - Verifies that if the provided dial target contains an authority, the
// bootstrap configuration contains server config for that authority.
//
// Returns the listener resource name template to use. If any of the above
// validations fail, a non-nil error is returned.
func (r *xdsResolver) sanityChecksOnBootstrapConfig(target resolver.Target, opts resolver.BuildOptions, client xdsclient.XDSClient) (string, error) {
bootstrapConfig := client.BootstrapConfig()
if bootstrapConfig == nil {
return nil, errors.New("bootstrap configuration is empty")
// This is never expected to happen after a successful xDS client
// creation. Defensive programming.
return "", fmt.Errorf("xds: bootstrap configuration is empty")
}
// If xds credentials were specified by the user, but bootstrap configs do
// not contain any certificate provider configuration, it is better to fail
// right now rather than failing when attempting to create certificate
// providers after receiving an CDS response with security configuration.
// If xDS credentials were specified by the user, but the bootstrap config
// does not contain any certificate providers, it is better to fail right
// now rather than failing when attempting to create certificate providers
// after receiving an CDS response with security configuration.
var creds credentials.TransportCredentials
switch {
case opts.DialCreds != nil:
@ -117,7 +155,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon
}
if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() {
if len(bootstrapConfig.CertProviderConfigs) == 0 {
return nil, fmt.Errorf("xds: use of xDS credentials is specified, but certificate_providers config missing in bootstrap file")
return "", fmt.Errorf("xds: use of xDS credentials is specified, but certificate_providers config missing in bootstrap file")
}
}
@ -128,7 +166,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon
if authority := target.URL.Host; authority != "" {
a := bootstrapConfig.Authorities[authority]
if a == nil {
return nil, fmt.Errorf("xds: authority %q is not found in the bootstrap file", authority)
return "", fmt.Errorf("xds: authority %q specified in dial target %q is not found in the bootstrap file", authority, target)
}
if a.ClientListenerResourceNameTemplate != "" {
// This check will never be false, because
@ -137,23 +175,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon
template = a.ClientListenerResourceNameTemplate
}
}
endpoint := target.URL.Path
if endpoint == "" {
endpoint = target.URL.Opaque
}
endpoint = strings.TrimPrefix(endpoint, "/")
r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, endpoint)
// Register a watch on the xdsClient for the resource name determined above.
cancelWatch := watchService(r.xdsClient, r.ldsResourceName, r.handleServiceUpdate, r.logger)
r.logger.Infof("Watch started on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient)
r.cancelWatch = func() {
cancelWatch()
r.logger.Infof("Watch cancel on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient)
}
go r.run()
return r, nil
return template, nil
}
// Name helps implement the resolver.Builder interface.
@ -161,43 +183,73 @@ func (*xdsResolverBuilder) Scheme() string {
return Scheme
}
// suWithError wraps the ServiceUpdate and error received through a watch API
// callback, so that it can pushed onto the update channel as a single entity.
type suWithError struct {
su serviceUpdate
emptyUpdate bool
err error
}
// xdsResolver implements the resolver.Resolver interface.
//
// It registers a watcher for ServiceConfig updates with the xdsClient object
// (which performs LDS/RDS queries for the same), and passes the received
// updates to the ClientConn.
type xdsResolver struct {
cc resolver.ClientConn
closed *grpcsync.Event
logger *grpclog.PrefixLogger
ldsResourceName string
cc resolver.ClientConn
logger *grpclog.PrefixLogger
// The underlying xdsClient which performs all xDS requests and responses.
xdsClient xdsclient.XDSClient
xdsClientClose func()
// A channel for the watch API callback to write service updates on to. The
// updates are read by the run goroutine and passed on to the ClientConn.
updateCh chan suWithError
// cancelWatch is the function to cancel the watcher.
cancelWatch func()
// activeClusters is a map from cluster name to a ref count. Only read or
// written during a service update (synchronous).
activeClusters map[string]*clusterInfo
curConfigSelector *configSelector
// A random number which uniquely identifies the channel which owns this
// resolver.
channelID uint64
// All methods on the xdsResolver type except for the ones invoked by gRPC,
// i.e ResolveNow() and Close(), are guaranteed to execute in the context of
// this serializer's callback. And since the serializer guarantees mutual
// exclusion among these callbacks, we can get by without any mutexes to
// access all of the below defined state. The only exception is Close(),
// which does access some of this shared state, but it does so after
// cancelling the context passed to the serializer.
serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc
ldsResourceName string
listenerWatcher *listenerWatcher
listenerUpdateRecvd bool
currentListener xdsresource.ListenerUpdate
rdsResourceName string
routeConfigWatcher *routeConfigWatcher
routeConfigUpdateRecvd bool
currentRouteConfig xdsresource.RouteConfigUpdate
currentVirtualHost *xdsresource.VirtualHost // Matched virtual host for quick access.
// activeClusters is a map from cluster name to information about the
// cluster that includes a ref count and load balancing configuration.
activeClusters map[string]*clusterInfo
curConfigSelector *configSelector
}
// ResolveNow is a no-op at this point.
func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (r *xdsResolver) Close() {
// Cancel the context passed to the serializer and wait for any scheduled
// callbacks to complete. Canceling the context ensures that no new
// callbacks will be scheduled.
r.serializerCancel()
<-r.serializer.Done()
// Note that Close needs to check for nils even if some of them are always
// set in the constructor. This is because the constructor defers Close() in
// error cases, and the fields might not be set when the error happens.
if r.listenerWatcher != nil {
r.listenerWatcher.stop()
}
if r.routeConfigWatcher != nil {
r.routeConfigWatcher.stop()
}
if r.xdsClientClose != nil {
r.xdsClientClose()
}
r.logger.Infof("Shutdown")
}
// sendNewServiceConfig prunes active clusters, generates a new service config
@ -205,6 +257,8 @@ type xdsResolver struct {
// channel with that service config and the provided config selector. Returns
// false if an error occurs while generating the service config and the update
// cannot be sent.
//
// Only executed in the context of a serializer callback.
func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
// Delete entries from r.activeClusters with zero references;
// otherwise serviceConfigJSON will generate a config including
@ -222,11 +276,11 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
sc, err := serviceConfigJSON(r.activeClusters)
if err != nil {
// JSON marshal error; should never happen.
r.logger.Errorf("%v", err)
r.logger.Errorf("For Listener resource %q and RouteConfiguration resource %q, failed to marshal newly built service config: %v", r.ldsResourceName, r.rdsResourceName, err)
r.cc.ReportError(err)
return false
}
r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.ldsResourceName, r.xdsClient, pretty.FormatJSON(sc))
r.logger.Infof("For Listener resource %q and RouteConfiguration resource %q, generated service config: %v", r.ldsResourceName, r.rdsResourceName, pretty.FormatJSON(sc))
// Send the update to the ClientConn.
state := iresolver.SetConfigSelector(resolver.State{
@ -236,94 +290,294 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
return true
}
// run is a long running goroutine which blocks on receiving service updates
// and passes it on the ClientConn.
func (r *xdsResolver) run() {
for {
select {
case <-r.closed.Done():
return
case update := <-r.updateCh:
if update.err != nil {
r.logger.Warningf("Watch error on resource %v from xds-client %p, %v", r.ldsResourceName, r.xdsClient, update.err)
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
// If error is resource-not-found, it means the LDS
// resource was removed. Ultimately send an empty service
// config, which picks pick-first, with no address, and
// puts the ClientConn into transient failure. Before we
// can do that, we may need to send a normal service config
// along with an erroring (nil) config selector.
r.sendNewServiceConfig(nil)
// Stop and dereference the active config selector, if one exists.
r.curConfigSelector.stop()
r.curConfigSelector = nil
continue
}
// Send error to ClientConn, and balancers, if error is not
// resource not found. No need to update resolver state if we
// can keep using the old config.
r.cc.ReportError(update.err)
continue
}
if update.emptyUpdate {
r.sendNewServiceConfig(r.curConfigSelector)
continue
}
// newConfigSelector creates a new config selector using the most recently
// received listener and route config updates. May add entries to
// r.activeClusters for previously-unseen clusters.
//
// Only executed in the context of a serializer callback.
func (r *xdsResolver) newConfigSelector() (*configSelector, error) {
cs := &configSelector{
r: r,
virtualHost: virtualHost{
httpFilterConfigOverride: r.currentVirtualHost.HTTPFilterConfigOverride,
retryConfig: r.currentVirtualHost.RetryConfig,
},
routes: make([]route, len(r.currentVirtualHost.Routes)),
clusters: make(map[string]*clusterInfo),
httpFilterConfig: r.currentListener.HTTPFilters,
}
// Create the config selector for this update.
cs, err := r.newConfigSelector(update.su)
if err != nil {
r.logger.Warningf("Error parsing update on resource %v from xds-client %p: %v", r.ldsResourceName, r.xdsClient, err)
r.cc.ReportError(err)
continue
for i, rt := range r.currentVirtualHost.Routes {
clusters := rinternal.NewWRR.(func() wrr.WRR)()
if rt.ClusterSpecifierPlugin != "" {
clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
clusters.Add(&routeCluster{
name: clusterName,
}, 1)
ci := r.addOrGetActiveClusterInfo(clusterName)
ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.currentRouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])}
cs.clusters[clusterName] = ci
} else {
for cluster, wc := range rt.WeightedClusters {
clusterName := clusterPrefix + cluster
clusters.Add(&routeCluster{
name: clusterName,
httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
}, int64(wc.Weight))
ci := r.addOrGetActiveClusterInfo(clusterName)
ci.cfg = xdsChildConfig{ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster})}
cs.clusters[clusterName] = ci
}
}
cs.routes[i].clusters = clusters
if !r.sendNewServiceConfig(cs) {
// JSON error creating the service config (unexpected); erase
// this config selector and ignore this update, continuing with
// the previous config selector.
cs.stop()
continue
}
var err error
cs.routes[i].m, err = xdsresource.RouteToMatcher(rt)
if err != nil {
return nil, err
}
cs.routes[i].actionType = rt.ActionType
if rt.MaxStreamDuration == nil {
cs.routes[i].maxStreamDuration = r.currentListener.MaxStreamDuration
} else {
cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
}
// Decrement references to the old config selector and assign the
// new one as the current one.
r.curConfigSelector.stop()
r.curConfigSelector = cs
cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
cs.routes[i].retryConfig = rt.RetryConfig
cs.routes[i].hashPolicies = rt.HashPolicies
}
// Account for this config selector's clusters. Do this after no further
// errors may occur. Note: cs.clusters are pointers to entries in
// activeClusters.
for _, ci := range cs.clusters {
atomic.AddInt32(&ci.refCount, 1)
}
return cs, nil
}
// pruneActiveClusters deletes entries in r.activeClusters with zero
// references.
func (r *xdsResolver) pruneActiveClusters() {
for cluster, ci := range r.activeClusters {
if atomic.LoadInt32(&ci.refCount) == 0 {
delete(r.activeClusters, cluster)
}
}
}
// handleServiceUpdate is the callback which handles service updates. It writes
// the received update to the update channel, which is picked by the run
// goroutine.
func (r *xdsResolver) handleServiceUpdate(su serviceUpdate, err error) {
if r.closed.HasFired() {
// Do not pass updates to the ClientConn once the resolver is closed.
func (r *xdsResolver) addOrGetActiveClusterInfo(name string) *clusterInfo {
ci := r.activeClusters[name]
if ci != nil {
return ci
}
ci = &clusterInfo{refCount: 0}
r.activeClusters[name] = ci
return ci
}
type clusterInfo struct {
// number of references to this cluster; accessed atomically
refCount int32
// cfg is the child configuration for this cluster, containing either the
// csp config or the cds cluster config.
cfg xdsChildConfig
}
// Determines if the xdsResolver has received all required configuration, i.e
// Listener and RouteConfiguration resources, from the management server, and
// whether a matching virtual host was found in the RouteConfiguration resource.
func (r *xdsResolver) resolutionComplete() bool {
return r.listenerUpdateRecvd && r.routeConfigUpdateRecvd && r.currentVirtualHost != nil
}
// onResolutionComplete performs the following actions when resolution is
// complete, i.e Listener and RouteConfiguration resources have been received
// from the management server and a matching virtual host is found in the
// latter.
// - creates a new config selector (this involves incrementing references to
// clusters owned by this config selector).
// - stops the old config selector (this involves decrementing references to
// clusters owned by this config selector).
// - prunes active clusters and pushes a new service config to the channel.
// - updates the current config selector used by the resolver.
//
// Only executed in the context of a serializer callback.
func (r *xdsResolver) onResolutionComplete() {
if !r.resolutionComplete() {
return
}
// Remove any existing entry in updateCh and replace with the new one.
select {
case <-r.updateCh:
default:
cs, err := r.newConfigSelector()
if err != nil {
r.logger.Warningf("Failed to build a config selector for resource %q: %v", r.ldsResourceName, err)
r.cc.ReportError(err)
return
}
r.updateCh <- suWithError{su: su, err: err}
if !r.sendNewServiceConfig(cs) {
// JSON error creating the service config (unexpected); erase
// this config selector and ignore this update, continuing with
// the previous config selector.
cs.stop()
return
}
r.curConfigSelector.stop()
r.curConfigSelector = cs
}
// ResolveNow is a no-op at this point.
func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (r *xdsResolver) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) {
matchVh := xdsresource.FindBestMatchingVirtualHost(r.ldsResourceName, update.VirtualHosts)
if matchVh == nil {
r.onError(fmt.Errorf("no matching virtual host found for %q", r.ldsResourceName))
return
}
r.currentRouteConfig = update
r.currentVirtualHost = matchVh
r.routeConfigUpdateRecvd = true
// Close closes the resolver, and also closes the underlying xdsClient.
func (r *xdsResolver) Close() {
// Note that Close needs to check for nils even if some of them are always
// set in the constructor. This is because the constructor defers Close() in
// error cases, and the fields might not be set when the error happens.
if r.cancelWatch != nil {
r.cancelWatch()
}
if r.xdsClientClose != nil {
r.xdsClientClose()
}
r.closed.Fire()
r.logger.Infof("Shutdown")
r.onResolutionComplete()
}
// onError propagates the error up to the channel. And since this is invoked
// only for non resource-not-found errors, we don't have to update resolver
// state and we can keep using the old config.
//
// Only executed in the context of a serializer callback.
func (r *xdsResolver) onError(err error) {
r.cc.ReportError(err)
}
// Contains common functionality to be executed when resources of either type
// are removed.
//
// Only executed in the context of a serializer callback.
func (r *xdsResolver) onResourceNotFound() {
// We cannot remove clusters from the service config that have ongoing RPCs.
// Instead, what we can do is to send an erroring (nil) config selector
// along with normal service config. This will ensure that new RPCs will
// fail, and once the active RPCs complete, the reference counts on the
// clusters will come down to zero. At that point, we will send an empty
// service config with no addresses. This results in the pick-first
// LB policy being configured on the channel, and since there are no
// address, pick-first will put the channel in TRANSIENT_FAILURE.
r.sendNewServiceConfig(nil)
// Stop and dereference the active config selector, if one exists.
r.curConfigSelector.stop()
r.curConfigSelector = nil
}
// Only executed in the context of a serializer callback.
func (r *xdsResolver) onListenerResourceUpdate(update xdsresource.ListenerUpdate) {
if r.logger.V(2) {
r.logger.Infof("Received update for Listener resource %q: %v", r.ldsResourceName, pretty.ToJSON(update))
}
r.currentListener = update
r.listenerUpdateRecvd = true
if update.InlineRouteConfig != nil {
// If there was a previous route config watcher because of a non-inline
// route configuration, cancel it.
r.rdsResourceName = ""
if r.routeConfigWatcher != nil {
r.routeConfigWatcher.stop()
r.routeConfigWatcher = nil
}
r.applyRouteConfigUpdate(*update.InlineRouteConfig)
return
}
// We get here only if there was no inline route configuration.
// If the route config name has not changed, send an update with existing
// route configuration and the newly received listener configuration.
if r.rdsResourceName == update.RouteConfigName {
r.onResolutionComplete()
return
}
// If the route config name has changed, cancel the old watcher and start a
// new one. At this point, since we have not yet resolved the new route
// config name, we don't send an update to the channel, and therefore
// continue using the old route configuration (if received) until the new
// one is received.
r.rdsResourceName = update.RouteConfigName
if r.routeConfigWatcher != nil {
r.routeConfigWatcher.stop()
r.currentVirtualHost = nil
r.routeConfigUpdateRecvd = false
}
r.routeConfigWatcher = newRouteConfigWatcher(r.rdsResourceName, r)
}
func (r *xdsResolver) onListenerResourceError(err error) {
if r.logger.V(2) {
r.logger.Infof("Received error for Listener resource %q: %v", r.ldsResourceName, err)
}
r.onError(err)
}
// Only executed in the context of a serializer callback.
func (r *xdsResolver) onListenerResourceNotFound() {
if r.logger.V(2) {
r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
}
r.listenerUpdateRecvd = false
if r.routeConfigWatcher != nil {
r.routeConfigWatcher.stop()
}
r.rdsResourceName = ""
r.currentVirtualHost = nil
r.routeConfigUpdateRecvd = false
r.routeConfigWatcher = nil
r.onResourceNotFound()
}
// Only executed in the context of a serializer callback.
func (r *xdsResolver) onRouteConfigResourceUpdate(name string, update xdsresource.RouteConfigUpdate) {
if r.logger.V(2) {
r.logger.Infof("Received update for RouteConfiguration resource %q: %v", name, pretty.ToJSON(update))
}
if r.rdsResourceName != name {
// Drop updates from canceled watchers.
return
}
r.applyRouteConfigUpdate(update)
}
// Only executed in the context of a serializer callback.
func (r *xdsResolver) onRouteConfigResourceError(name string, err error) {
if r.logger.V(2) {
r.logger.Infof("Received error for RouteConfiguration resource %q: %v", name, err)
}
r.onError(err)
}
// Only executed in the context of a serializer callback.
func (r *xdsResolver) onRouteConfigResourceNotFound(name string) {
if r.logger.V(2) {
r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name)
}
if r.rdsResourceName != name {
return
}
r.onResourceNotFound()
}
// Only executed in the context of a serializer callback.
func (r *xdsResolver) onClusterRefDownToZero() {
r.sendNewServiceConfig(r.curConfigSelector)
}

View File

@ -103,7 +103,7 @@ func (s) TestResolverBuilder_DifferentBootstrapConfigs(t *testing.T) {
NodeID: "node-id",
ServerURI: "dummy-management-server",
},
wantErr: `authority "non-existing-authority" is not found in the bootstrap file`,
wantErr: `authority "non-existing-authority" specified in dial target "xds://non-existing-authority/target" is not found in the bootstrap file`,
},
{
name: "xDS creds specified without certificate providers in bootstrap",
@ -997,7 +997,7 @@ func (s) TestResolverWRR(t *testing.T) {
// Read the update pushed by the resolver to the ClientConn.
cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
// Make RPCs are verify WRR behavior in the cluster specifier.
// Make RPCs to verify WRR behavior in the cluster specifier.
picks := map[string]int{}
for i := 0; i < 100; i++ {
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})