xds: Export a WatchListener() method on the xdsClient. (#3817)

This commit is contained in:
Easwar Swaminathan 2020-08-18 15:40:27 -07:00 committed by GitHub
parent a3740e5ed3
commit 0f73133e3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 445 additions and 353 deletions

View File

@ -96,10 +96,10 @@ type APIClientBuilder interface {
// version specific implementations of the xDS client.
type APIClient interface {
// AddWatch adds a watch for an xDS resource given its type and name.
AddWatch(resourceType, resourceName string)
AddWatch(ResourceType, string)
// RemoveWatch cancels an already registered watch for an xDS resource
// given its type and name.
RemoveWatch(resourceType, resourceName string)
RemoveWatch(ResourceType, string)
// Close cleans up resources allocated by the API client.
Close()
}
@ -396,3 +396,65 @@ func (c *Client) Close() {
c.cc.Close()
c.logger.Infof("Shutdown")
}
// ResourceType identifies resources in a transport protocol agnostic way. These
// will be used in transport version agnostic code, while the versioned API
// clients will map these to appropriate version URLs.
type ResourceType int
// Version agnostic resource type constants.
const (
UnknownResource ResourceType = iota
ListenerResource
HTTPConnManagerResource
RouteConfigResource
ClusterResource
EndpointsResource
)
func (r ResourceType) String() string {
switch r {
case ListenerResource:
return "ListenerResource"
case HTTPConnManagerResource:
return "HTTPConnManagerResource"
case RouteConfigResource:
return "RouteConfigResource"
case ClusterResource:
return "ClusterResource"
case EndpointsResource:
return "EndpointsResource"
default:
return "UnknownResource"
}
}
// IsListenerResource returns true if the provider URL corresponds to an xDS
// Listener resource.
func IsListenerResource(url string) bool {
return url == version.V2ListenerURL || url == version.V3ListenerURL
}
// IsHTTPConnManagerResource returns true if the provider URL corresponds to an xDS
// HTTPConnManager resource.
func IsHTTPConnManagerResource(url string) bool {
return url == version.V2HTTPConnManagerURL || url == version.V3HTTPConnManagerURL
}
// IsRouteConfigResource returns true if the provider URL corresponds to an xDS
// RouteConfig resource.
func IsRouteConfigResource(url string) bool {
return url == version.V2RouteConfigURL || url == version.V3RouteConfigURL
}
// IsClusterResource returns true if the provider URL corresponds to an xDS
// Cluster resource.
func IsClusterResource(url string) bool {
return url == version.V2ClusterURL || url == version.V3ClusterURL
}
// IsEndpointsResource returns true if the provider URL corresponds to an xDS
// Endpoints resource.
func IsEndpointsResource(url string) bool {
return url == version.V2EndpointsURL || url == version.V3EndpointsURL
}

View File

@ -18,8 +18,6 @@
package client
import "google.golang.org/grpc/xds/internal/version"
type watcherInfoWithUpdate struct {
wi *watchInfo
update interface{}
@ -46,20 +44,20 @@ func (c *Client) callCallback(wiu *watcherInfoWithUpdate) {
// window that a watcher's callback could be called after the watcher is
// canceled, and the user needs to take care of it.
var ccb func()
switch wiu.wi.typeURL {
case version.V2ListenerURL:
switch wiu.wi.rType {
case ListenerResource:
if s, ok := c.ldsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.ldsCallback(wiu.update.(ListenerUpdate), wiu.err) }
}
case version.V2RouteConfigURL:
case RouteConfigResource:
if s, ok := c.rdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.rdsCallback(wiu.update.(RouteConfigUpdate), wiu.err) }
}
case version.V2ClusterURL:
case ClusterResource:
if s, ok := c.cdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.cdsCallback(wiu.update.(ClusterUpdate), wiu.err) }
}
case version.V2EndpointsURL:
case EndpointsResource:
if s, ok := c.edsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.edsCallback(wiu.update.(EndpointsUpdate), wiu.err) }
}

View File

@ -67,8 +67,8 @@ func clientOpts(balancerName string, overrideWatchExpiryTImeout bool) Options {
type testAPIClient struct {
r UpdateHandler
addWatches map[string]*testutils.Channel
removeWatches map[string]*testutils.Channel
addWatches map[ResourceType]*testutils.Channel
removeWatches map[ResourceType]*testutils.Channel
}
func overrideNewAPIClient() (<-chan *testAPIClient, func()) {
@ -83,16 +83,18 @@ func overrideNewAPIClient() (<-chan *testAPIClient, func()) {
}
func newTestAPIClient(r UpdateHandler) *testAPIClient {
addWatches := make(map[string]*testutils.Channel)
addWatches[version.V2ListenerURL] = testutils.NewChannel()
addWatches[version.V2RouteConfigURL] = testutils.NewChannel()
addWatches[version.V2ClusterURL] = testutils.NewChannel()
addWatches[version.V2EndpointsURL] = testutils.NewChannel()
removeWatches := make(map[string]*testutils.Channel)
removeWatches[version.V2ListenerURL] = testutils.NewChannel()
removeWatches[version.V2RouteConfigURL] = testutils.NewChannel()
removeWatches[version.V2ClusterURL] = testutils.NewChannel()
removeWatches[version.V2EndpointsURL] = testutils.NewChannel()
addWatches := map[ResourceType]*testutils.Channel{
ListenerResource: testutils.NewChannel(),
RouteConfigResource: testutils.NewChannel(),
ClusterResource: testutils.NewChannel(),
EndpointsResource: testutils.NewChannel(),
}
removeWatches := map[ResourceType]*testutils.Channel{
ListenerResource: testutils.NewChannel(),
RouteConfigResource: testutils.NewChannel(),
ClusterResource: testutils.NewChannel(),
EndpointsResource: testutils.NewChannel(),
}
return &testAPIClient{
r: r,
addWatches: addWatches,
@ -100,11 +102,11 @@ func newTestAPIClient(r UpdateHandler) *testAPIClient {
}
}
func (c *testAPIClient) AddWatch(resourceType, resourceName string) {
func (c *testAPIClient) AddWatch(resourceType ResourceType, resourceName string) {
c.addWatches[resourceType].Send(resourceName)
}
func (c *testAPIClient) RemoveWatch(resourceType, resourceName string) {
func (c *testAPIClient) RemoveWatch(resourceType ResourceType, resourceName string) {
c.removeWatches[resourceType].Send(resourceName)
}
@ -130,12 +132,12 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) {
clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
// Calls another watch inline, to ensure there's deadlock.
c.WatchCluster("another-random-name", func(ClusterUpdate, error) {})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); firstTime && err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); firstTime && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
firstTime = false
})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}

View File

@ -22,8 +22,6 @@ import (
"fmt"
"sync"
"time"
"google.golang.org/grpc/xds/internal/version"
)
type watchInfoState int
@ -37,9 +35,9 @@ const (
// watchInfo holds all the information from a watch() call.
type watchInfo struct {
c *Client
typeURL string
target string
c *Client
rType ResourceType
target string
ldsCallback func(ListenerUpdate, error)
rdsCallback func(RouteConfigUpdate, error)
@ -74,7 +72,7 @@ func (wi *watchInfo) resourceNotFound() {
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %s target %s not found in received response", wi.typeURL, wi.target))
wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %v target %s not found in received response", wi.rType, wi.target))
}
func (wi *watchInfo) timeout() {
@ -84,7 +82,7 @@ func (wi *watchInfo) timeout() {
return
}
wi.state = watchInfoStateTimeout
wi.sendErrorLocked(fmt.Errorf("xds: %s target %s not found, watcher timeout", wi.typeURL, wi.target))
wi.sendErrorLocked(fmt.Errorf("xds: %v target %s not found, watcher timeout", wi.rType, wi.target))
}
// Caller must hold wi.mu.
@ -92,14 +90,14 @@ func (wi *watchInfo) sendErrorLocked(err error) {
var (
u interface{}
)
switch wi.typeURL {
case version.V2ListenerURL:
switch wi.rType {
case ListenerResource:
u = ListenerUpdate{}
case version.V2RouteConfigURL:
case RouteConfigResource:
u = RouteConfigUpdate{}
case version.V2ClusterURL:
case ClusterResource:
u = ClusterUpdate{}
case version.V2EndpointsURL:
case EndpointsResource:
u = EndpointsUpdate{}
}
wi.c.scheduleCallback(wi, u, err)
@ -118,54 +116,54 @@ func (wi *watchInfo) cancel() {
func (c *Client) watch(wi *watchInfo) (cancel func()) {
c.mu.Lock()
defer c.mu.Unlock()
c.logger.Debugf("new watch for type %v, resource name %v", wi.typeURL, wi.target)
c.logger.Debugf("new watch for type %v, resource name %v", wi.rType, wi.target)
var watchers map[string]map[*watchInfo]bool
switch wi.typeURL {
case version.V2ListenerURL:
switch wi.rType {
case ListenerResource:
watchers = c.ldsWatchers
case version.V2RouteConfigURL:
case RouteConfigResource:
watchers = c.rdsWatchers
case version.V2ClusterURL:
case ClusterResource:
watchers = c.cdsWatchers
case version.V2EndpointsURL:
case EndpointsResource:
watchers = c.edsWatchers
}
resourceName := wi.target
s, ok := watchers[wi.target]
if !ok {
// If this is a new watcher, will ask lower level to send a new request with
// the resource name.
// If this is a new watcher, will ask lower level to send a new request
// with the resource name.
//
// If this type+name is already being watched, will not notify the
// underlying xdsv2Client.
c.logger.Debugf("first watch for type %v, resource name %v, will send a new xDS request", wi.typeURL, wi.target)
// If this (type+name) is already being watched, will not notify the
// underlying versioned apiClient.
c.logger.Debugf("first watch for type %v, resource name %v, will send a new xDS request", wi.rType, wi.target)
s = make(map[*watchInfo]bool)
watchers[resourceName] = s
c.apiClient.AddWatch(wi.typeURL, resourceName)
c.apiClient.AddWatch(wi.rType, resourceName)
}
// No matter what, add the new watcher to the set, so it's callback will be
// call for new responses.
s[wi] = true
// If the resource is in cache, call the callback with the value.
switch wi.typeURL {
case version.V2ListenerURL:
switch wi.rType {
case ListenerResource:
if v, ok := c.ldsCache[resourceName]; ok {
c.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case version.V2RouteConfigURL:
case RouteConfigResource:
if v, ok := c.rdsCache[resourceName]; ok {
c.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case version.V2ClusterURL:
case ClusterResource:
if v, ok := c.cdsCache[resourceName]; ok {
c.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case version.V2EndpointsURL:
case EndpointsResource:
if v, ok := c.edsCache[resourceName]; ok {
c.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
@ -173,7 +171,7 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) {
}
return func() {
c.logger.Debugf("watch for type %v, resource name %v canceled", wi.typeURL, wi.target)
c.logger.Debugf("watch for type %v, resource name %v canceled", wi.rType, wi.target)
wi.cancel()
c.mu.Lock()
defer c.mu.Unlock()
@ -182,22 +180,22 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) {
// future.
delete(s, wi)
if len(s) == 0 {
c.logger.Debugf("last watch for type %v, resource name %v canceled, will send a new xDS request", wi.typeURL, wi.target)
c.logger.Debugf("last watch for type %v, resource name %v canceled, will send a new xDS request", wi.rType, wi.target)
// If this was the last watcher, also tell xdsv2Client to stop
// watching this resource.
delete(watchers, resourceName)
c.apiClient.RemoveWatch(wi.typeURL, resourceName)
c.apiClient.RemoveWatch(wi.rType, resourceName)
// Remove the resource from cache. When a watch for this
// resource is added later, it will trigger a xDS request with
// resource names, and client will receive new xDS responses.
switch wi.typeURL {
case version.V2ListenerURL:
switch wi.rType {
case ListenerResource:
delete(c.ldsCache, resourceName)
case version.V2RouteConfigURL:
case RouteConfigResource:
delete(c.rdsCache, resourceName)
case version.V2ClusterURL:
case ClusterResource:
delete(c.cdsCache, resourceName)
case version.V2EndpointsURL:
case EndpointsResource:
delete(c.edsCache, resourceName)
}
}
@ -205,6 +203,18 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) {
}
}
// WatchListener uses LDS to discover information about the provided listener.
//
// WatchListener is expected to called only from the server side implementation
// of xDS. Clients will use WatchService instead.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchListener(listener string, cb func(ListenerUpdate, error)) (cancel func()) {
return c.watchLDS(listener, cb)
}
// watchLDS starts a listener watcher for the service..
//
// Note that during race (e.g. an xDS response is received while the user is
@ -213,7 +223,7 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) {
func (c *Client) watchLDS(serviceName string, cb func(ListenerUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
typeURL: version.V2ListenerURL,
rType: ListenerResource,
target: serviceName,
ldsCallback: cb,
}
@ -232,7 +242,7 @@ func (c *Client) watchLDS(serviceName string, cb func(ListenerUpdate, error)) (c
func (c *Client) watchRDS(routeName string, cb func(RouteConfigUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
typeURL: version.V2RouteConfigURL,
rType: RouteConfigResource,
target: routeName,
rdsCallback: cb,
}
@ -358,7 +368,7 @@ func (w *serviceUpdateWatcher) close() {
func (c *Client) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
typeURL: version.V2ClusterURL,
rType: ClusterResource,
target: clusterName,
cdsCallback: cb,
}
@ -380,7 +390,7 @@ func (c *Client) WatchCluster(clusterName string, cb func(ClusterUpdate, error))
func (c *Client) WatchEndpoints(clusterName string, cb func(EndpointsUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
typeURL: version.V2EndpointsURL,
rType: EndpointsResource,
target: clusterName,
edsCallback: cb,
}

View File

@ -22,7 +22,6 @@ import (
"testing"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/version"
)
type clusterUpdateErr struct {
@ -53,7 +52,7 @@ func (s) TestClusterWatch(t *testing.T) {
cancelWatch := c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -117,7 +116,7 @@ func (s) TestClusterTwoWatchSameResourceName(t *testing.T) {
cancelLastWatch = c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); i == 0 && err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); i == 0 && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@ -174,7 +173,7 @@ func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) {
c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); i == 0 && err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); i == 0 && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@ -184,7 +183,7 @@ func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) {
c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -224,7 +223,7 @@ func (s) TestClusterWatchAfterCache(t *testing.T) {
c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -242,7 +241,7 @@ func (s) TestClusterWatchAfterCache(t *testing.T) {
c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
})
if n, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err == nil {
if n, err := v2Client.addWatches[ClusterResource].Receive(); err == nil {
t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
}
@ -276,7 +275,7 @@ func (s) TestClusterWatchExpiryTimer(t *testing.T) {
c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -312,7 +311,7 @@ func (s) TestClusterWatchExpiryTimerStop(t *testing.T) {
c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -354,7 +353,7 @@ func (s) TestClusterResourceRemoved(t *testing.T) {
c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
clusterUpdateCh1.Send(clusterUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
// Another watch for a different name.
@ -362,7 +361,7 @@ func (s) TestClusterResourceRemoved(t *testing.T) {
c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ClusterURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}

View File

@ -23,9 +23,9 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/version"
)
var (
@ -71,7 +71,7 @@ func (s) TestEndpointsWatch(t *testing.T) {
cancelWatch := c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); err != nil {
if _, err := v2Client.addWatches[EndpointsResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -129,7 +129,7 @@ func (s) TestEndpointsTwoWatchSameResourceName(t *testing.T) {
cancelLastWatch = c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); i == 0 && err != nil {
if _, err := v2Client.addWatches[EndpointsResource].Receive(); i == 0 && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@ -186,7 +186,7 @@ func (s) TestEndpointsThreeWatchDifferentResourceName(t *testing.T) {
c.WatchEndpoints(testCDSName+"1", func(update EndpointsUpdate, err error) {
endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); i == 0 && err != nil {
if _, err := v2Client.addWatches[EndpointsResource].Receive(); i == 0 && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@ -196,7 +196,7 @@ func (s) TestEndpointsThreeWatchDifferentResourceName(t *testing.T) {
c.WatchEndpoints(testCDSName+"2", func(update EndpointsUpdate, err error) {
endpointsUpdateCh2.Send(endpointsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); err != nil {
if _, err := v2Client.addWatches[EndpointsResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -236,7 +236,7 @@ func (s) TestEndpointsWatchAfterCache(t *testing.T) {
c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); err != nil {
if _, err := v2Client.addWatches[EndpointsResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -254,7 +254,7 @@ func (s) TestEndpointsWatchAfterCache(t *testing.T) {
c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
endpointsUpdateCh2.Send(endpointsUpdateErr{u: update, err: err})
})
if n, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); err == nil {
if n, err := v2Client.addWatches[EndpointsResource].Receive(); err == nil {
t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
}
@ -288,7 +288,7 @@ func (s) TestEndpointsWatchExpiryTimer(t *testing.T) {
c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2EndpointsURL].Receive(); err != nil {
if _, err := v2Client.addWatches[EndpointsResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}

View File

@ -22,7 +22,6 @@ import (
"testing"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/version"
)
type ldsUpdateErr struct {
@ -50,7 +49,7 @@ func (s) TestLDSWatch(t *testing.T) {
cancelWatch := c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -109,7 +108,7 @@ func (s) TestLDSTwoWatchSameResourceName(t *testing.T) {
cancelLastWatch = c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); i == 0 && err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); i == 0 && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@ -166,7 +165,7 @@ func (s) TestLDSThreeWatchDifferentResourceName(t *testing.T) {
c.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) {
ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); i == 0 && err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); i == 0 && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@ -176,7 +175,7 @@ func (s) TestLDSThreeWatchDifferentResourceName(t *testing.T) {
c.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) {
ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -216,7 +215,7 @@ func (s) TestLDSWatchAfterCache(t *testing.T) {
c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -234,7 +233,7 @@ func (s) TestLDSWatchAfterCache(t *testing.T) {
c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err})
})
if n, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err == nil {
if n, err := v2Client.addWatches[ListenerResource].Receive(); err == nil {
t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
}
@ -271,7 +270,7 @@ func (s) TestLDSResourceRemoved(t *testing.T) {
c.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) {
ldsUpdateCh1.Send(ldsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
// Another watch for a different name.
@ -279,7 +278,7 @@ func (s) TestLDSResourceRemoved(t *testing.T) {
c.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) {
ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}

View File

@ -23,7 +23,6 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/version"
)
type rdsUpdateErr struct {
@ -51,7 +50,7 @@ func (s) TestRDSWatch(t *testing.T) {
cancelWatch := c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -109,7 +108,7 @@ func (s) TestRDSTwoWatchSameResourceName(t *testing.T) {
cancelLastWatch = c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); i == 0 && err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); i == 0 && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@ -166,7 +165,7 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) {
c.watchRDS(testRDSName+"1", func(update RouteConfigUpdate, err error) {
rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); i == 0 && err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); i == 0 && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@ -176,7 +175,7 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) {
c.watchRDS(testRDSName+"2", func(update RouteConfigUpdate, err error) {
rdsUpdateCh2.Send(rdsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -216,7 +215,7 @@ func (s) TestRDSWatchAfterCache(t *testing.T) {
c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -234,7 +233,7 @@ func (s) TestRDSWatchAfterCache(t *testing.T) {
c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
rdsUpdateCh2.Send(rdsUpdateErr{u: update, err: err})
})
if n, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err == nil {
if n, err := v2Client.addWatches[RouteConfigResource].Receive(); err == nil {
t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
}

View File

@ -23,8 +23,8 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/version"
)
type serviceUpdateErr struct {
@ -56,13 +56,13 @@ func (s) TestServiceWatch(t *testing.T) {
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewListeners(map[string]ListenerUpdate{
testLDSName: {RouteConfigName: testRDSName},
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
@ -114,13 +114,13 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewListeners(map[string]ListenerUpdate{
testLDSName: {RouteConfigName: testRDSName},
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
@ -135,7 +135,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
v2Client.r.NewListeners(map[string]ListenerUpdate{
testLDSName: {RouteConfigName: testRDSName + "2"},
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
@ -181,13 +181,13 @@ func (s) TestServiceWatchSecond(t *testing.T) {
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewListeners(map[string]ListenerUpdate{
testLDSName: {RouteConfigName: testRDSName},
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
@ -253,7 +253,7 @@ func (s) TestServiceWatchWithNoResponseFromServer(t *testing.T) {
c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
u, err := serviceUpdateCh.TimedReceive(defaultTestWatchExpiryTimeout * 2)
@ -288,13 +288,13 @@ func (s) TestServiceWatchEmptyRDS(t *testing.T) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewListeners(map[string]ListenerUpdate{
testLDSName: {RouteConfigName: testRDSName},
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{})
@ -331,13 +331,13 @@ func (s) TestServiceWatchWithClientClose(t *testing.T) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewListeners(map[string]ListenerUpdate{
testLDSName: {RouteConfigName: testRDSName},
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
// Client is closed before it receives the RDS response.
@ -369,13 +369,13 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewListeners(map[string]ListenerUpdate{
testLDSName: {RouteConfigName: testRDSName},
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
@ -390,7 +390,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
v2Client.r.NewListeners(map[string]ListenerUpdate{
testLDSName: {RouteConfigName: testRDSName},
})
if v, err := v2Client.removeWatches[version.V2RouteConfigURL].Receive(); err == nil {
if v, err := v2Client.removeWatches[RouteConfigResource].Receive(); err == nil {
t.Fatalf("unexpected rds watch cancel: %v", v)
}
}
@ -420,13 +420,13 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
if _, err := v2Client.addWatches[version.V2ListenerURL].Receive(); err != nil {
if _, err := v2Client.addWatches[ListenerResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewListeners(map[string]ListenerUpdate{
testLDSName: {RouteConfigName: testRDSName},
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
@ -440,7 +440,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
// Remove LDS resource, should cancel the RDS watch, and trigger resource
// removed error.
v2Client.r.NewListeners(map[string]ListenerUpdate{})
if _, err := v2Client.removeWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.removeWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want watch to be canceled, got error %v", err)
}
if u, err := serviceUpdateCh.Receive(); err != nil || ErrType(u.(serviceUpdateErr).err) != ErrorTypeResourceNotFound {
@ -462,7 +462,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
v2Client.r.NewListeners(map[string]ListenerUpdate{
testLDSName: {RouteConfigName: testRDSName},
})
if _, err := v2Client.addWatches[version.V2RouteConfigURL].Receive(); err != nil {
if _, err := v2Client.addWatches[RouteConfigResource].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
if u, err := serviceUpdateCh.Receive(); err != testutils.ErrRecvTimeout {

View File

@ -33,9 +33,9 @@ import (
v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/golang/protobuf/proto"
anypb "github.com/golang/protobuf/ptypes/any"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/version"
)
// UnmarshalListener processes resources received in an LDS response, validates
@ -44,8 +44,8 @@ import (
func UnmarshalListener(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map[string]ListenerUpdate, error) {
update := make(map[string]ListenerUpdate)
for _, r := range resources {
if t := r.GetTypeUrl(); t != version.V2ListenerURL && t != version.V3ListenerURL {
return nil, fmt.Errorf("xds: unexpected resource type: %s in LDS response", t)
if !IsListenerResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in LDS response", r.GetTypeUrl())
}
lis := &v3listenerpb.Listener{}
if err := proto.Unmarshal(r.GetValue(), lis); err != nil {
@ -68,8 +68,8 @@ func getRouteConfigNameFromListener(lis *v3listenerpb.Listener, logger *grpclog.
return "", fmt.Errorf("xds: no api_listener field in LDS response %+v", lis)
}
apiLisAny := lis.GetApiListener().GetApiListener()
if t := apiLisAny.GetTypeUrl(); t != version.V3HTTPConnManagerURL && t != version.V2HTTPConnManagerURL {
return "", fmt.Errorf("xds: unexpected resource type: %s in LDS response", t)
if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
return "", fmt.Errorf("xds: unexpected resource type: %s in LDS response", apiLisAny.GetTypeUrl())
}
apiLis := &v3httppb.HttpConnectionManager{}
if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil {
@ -105,8 +105,8 @@ func getRouteConfigNameFromListener(lis *v3listenerpb.Listener, logger *grpclog.
func UnmarshalRouteConfig(resources []*anypb.Any, hostname string, logger *grpclog.PrefixLogger) (map[string]RouteConfigUpdate, error) {
update := make(map[string]RouteConfigUpdate)
for _, r := range resources {
if t := r.GetTypeUrl(); t != version.V2RouteConfigURL && t != version.V3RouteConfigURL {
return nil, fmt.Errorf("xds: unexpected resource type: %s in RDS response", t)
if !IsRouteConfigResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in RDS response", r.GetTypeUrl())
}
rc := &v3routepb.RouteConfiguration{}
if err := proto.Unmarshal(r.GetValue(), rc); err != nil {
@ -369,8 +369,8 @@ func findBestMatchingVirtualHost(host string, vHosts []*v3routepb.VirtualHost) *
func UnmarshalCluster(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map[string]ClusterUpdate, error) {
update := make(map[string]ClusterUpdate)
for _, r := range resources {
if t := r.GetTypeUrl(); t != version.V2ClusterURL && t != version.V3ClusterURL {
return nil, fmt.Errorf("xds: unexpected resource type: %s in CDS response", t)
if !IsClusterResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in CDS response", r.GetTypeUrl())
}
cluster := &v3clusterpb.Cluster{}
@ -417,8 +417,8 @@ func validateCluster(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
func UnmarshalEndpoints(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map[string]EndpointsUpdate, error) {
update := make(map[string]EndpointsUpdate)
for _, r := range resources {
if t := r.GetTypeUrl(); t != version.V2EndpointsURL && t != version.V3EndpointsURL {
return nil, fmt.Errorf("xds: unexpected resource type: %s in EDS response", t)
if !IsEndpointsResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in EDS response", r.GetTypeUrl())
}
cla := &v3endpointpb.ClusterLoadAssignment{}

View File

@ -48,11 +48,9 @@ type VersionedClient interface {
// transport protocol version.
NewStream(ctx context.Context) (grpc.ClientStream, error)
// params: resources, typeURL, version, nonce
// SendRequest constructs and sends out a DiscoveryRequest message specific
// to the underlying transport protocol version.
SendRequest(s grpc.ClientStream, resourceNames []string, typeURL string, version string, nonce string) error
SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version string, nonce string) error
// RecvResponse uses the provided stream to receive a response specific to
// the underlying transport protocol version.
@ -61,11 +59,11 @@ type VersionedClient interface {
// HandleResponse parses and validates the received response and notifies
// the top-level client which in turn notifies the registered watchers.
//
// Return values are: typeURL, version, nonce, error.
// Return values are: resourceType, version, nonce, error.
// If the provided protobuf message contains a resource type which is not
// supported, implementations must return an error of type
// ErrResourceTypeUnsupported.
HandleResponse(proto.Message) (string, string, string, error)
HandleResponse(proto.Message) (ResourceType, string, string, error)
}
// TransportHelper contains all xDS transport protocol related functionality
@ -95,14 +93,14 @@ type TransportHelper struct {
// messages. When the user of this client object cancels a watch call,
// these are set to nil. All accesses to the map protected and any value
// inside the map should be protected with the above mutex.
watchMap map[string]map[string]bool
watchMap map[ResourceType]map[string]bool
// versionMap contains the version that was acked (the version in the ack
// request that was sent on wire). The key is typeURL, the value is the
// request that was sent on wire). The key is rType, the value is the
// version string, becaues the versions for different resource types should
// be independent.
versionMap map[string]string
versionMap map[ResourceType]string
// nonceMap contains the nonce from the most recent received response.
nonceMap map[string]string
nonceMap map[ResourceType]string
}
// NewTransportHelper creates a new transport helper to be used by versioned
@ -117,9 +115,9 @@ func NewTransportHelper(vc VersionedClient, logger *grpclog.PrefixLogger, backof
streamCh: make(chan grpc.ClientStream, 1),
sendCh: buffer.NewUnbounded(),
watchMap: make(map[string]map[string]bool),
versionMap: make(map[string]string),
nonceMap: make(map[string]string),
watchMap: make(map[ResourceType]map[string]bool),
versionMap: make(map[ResourceType]string),
nonceMap: make(map[ResourceType]string),
}
go t.run(ctx)
@ -127,9 +125,9 @@ func NewTransportHelper(vc VersionedClient, logger *grpclog.PrefixLogger, backof
}
// AddWatch adds a watch for an xDS resource given its type and name.
func (t *TransportHelper) AddWatch(resourceType, resourceName string) {
func (t *TransportHelper) AddWatch(rType ResourceType, resourceName string) {
t.sendCh.Put(&watchAction{
typeURL: resourceType,
rType: rType,
remove: false,
resource: resourceName,
})
@ -137,9 +135,9 @@ func (t *TransportHelper) AddWatch(resourceType, resourceName string) {
// RemoveWatch cancels an already registered watch for an xDS resource
// given its type and name.
func (t *TransportHelper) RemoveWatch(resourceType, resourceName string) {
func (t *TransportHelper) RemoveWatch(rType ResourceType, resourceName string) {
t.sendCh.Put(&watchAction{
typeURL: resourceType,
rType: rType,
remove: true,
resource: resourceName,
})
@ -228,15 +226,16 @@ func (t *TransportHelper) send(ctx context.Context) {
t.sendCh.Load()
var (
target []string
typeURL, version, nonce string
send bool
target []string
rType ResourceType
version, nonce string
send bool
)
switch update := u.(type) {
case *watchAction:
target, typeURL, version, nonce = t.processWatchInfo(update)
target, rType, version, nonce = t.processWatchInfo(update)
case *ackAction:
target, typeURL, version, nonce, send = t.processAckInfo(update, stream)
target, rType, version, nonce, send = t.processAckInfo(update, stream)
if !send {
continue
}
@ -248,8 +247,8 @@ func (t *TransportHelper) send(ctx context.Context) {
// sending response back).
continue
}
if err := t.vClient.SendRequest(stream, target, typeURL, version, nonce); err != nil {
t.logger.Warningf("ADS request for {target: %q, type: %q, version: %q, nonce: %q} failed: %v", target, typeURL, version, nonce, err)
if err := t.vClient.SendRequest(stream, target, rType, version, nonce); err != nil {
t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err)
// send failed, clear the current stream.
stream = nil
}
@ -269,11 +268,11 @@ func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool {
defer t.mu.Unlock()
// Reset the ack versions when the stream restarts.
t.versionMap = make(map[string]string)
t.nonceMap = make(map[string]string)
t.versionMap = make(map[ResourceType]string)
t.nonceMap = make(map[ResourceType]string)
for typeURL, s := range t.watchMap {
if err := t.vClient.SendRequest(stream, mapToSlice(s), typeURL, "", ""); err != nil {
for rType, s := range t.watchMap {
if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", ""); err != nil {
t.logger.Errorf("ADS request failed: %v", err)
return false
}
@ -292,28 +291,28 @@ func (t *TransportHelper) recv(stream grpc.ClientStream) bool {
t.logger.Warningf("ADS stream is closed with error: %v", err)
return success
}
typeURL, version, nonce, err := t.vClient.HandleResponse(resp)
rType, version, nonce, err := t.vClient.HandleResponse(resp)
if e, ok := err.(ErrResourceTypeUnsupported); ok {
t.logger.Warningf("%s", e.ErrStr)
continue
}
if err != nil {
t.sendCh.Put(&ackAction{
typeURL: typeURL,
rType: rType,
version: "",
nonce: nonce,
stream: stream,
})
t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", typeURL, version, nonce, err)
t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err)
continue
}
t.sendCh.Put(&ackAction{
typeURL: typeURL,
rType: rType,
version: version,
nonce: nonce,
stream: stream,
})
t.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", typeURL, version, nonce)
t.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", rType, version, nonce)
success = true
}
}
@ -326,46 +325,46 @@ func mapToSlice(m map[string]bool) (ret []string) {
}
type watchAction struct {
typeURL string
rType ResourceType
remove bool // Whether this is to remove watch for the resource.
resource string
}
// processWatchInfo pulls the fields needed by the request from a watchAction.
//
// It also updates the watch map in v2c.
func (t *TransportHelper) processWatchInfo(w *watchAction) (target []string, typeURL, ver, nonce string) {
// It also updates the watch map.
func (t *TransportHelper) processWatchInfo(w *watchAction) (target []string, rType ResourceType, ver, nonce string) {
t.mu.Lock()
defer t.mu.Unlock()
var current map[string]bool
current, ok := t.watchMap[w.typeURL]
current, ok := t.watchMap[w.rType]
if !ok {
current = make(map[string]bool)
t.watchMap[w.typeURL] = current
t.watchMap[w.rType] = current
}
if w.remove {
delete(current, w.resource)
if len(current) == 0 {
delete(t.watchMap, w.typeURL)
delete(t.watchMap, w.rType)
}
} else {
current[w.resource] = true
}
typeURL = w.typeURL
rType = w.rType
target = mapToSlice(current)
// We don't reset version or nonce when a new watch is started. The version
// and nonce from previous response are carried by the request unless the
// stream is recreated.
ver = t.versionMap[typeURL]
nonce = t.nonceMap[typeURL]
return target, typeURL, ver, nonce
ver = t.versionMap[rType]
nonce = t.nonceMap[rType]
return target, rType, ver, nonce
}
type ackAction struct {
typeURL string
rType ResourceType
version string // NACK if version is an empty string.
nonce string
// ACK/NACK are tagged with the stream it's for. When the stream is down,
@ -377,15 +376,15 @@ type ackAction struct {
// processAckInfo pulls the fields needed by the ack request from a ackAction.
//
// If no active watch is found for this ack, it returns false for send.
func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, typeURL, version, nonce string, send bool) {
func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce string, send bool) {
if ack.stream != stream {
// If ACK's stream isn't the current sending stream, this means the ACK
// was pushed to queue before the old stream broke, and a new stream has
// been started since. Return immediately here so we don't update the
// nonce for the new stream.
return nil, "", "", "", false
return nil, UnknownResource, "", "", false
}
typeURL = ack.typeURL
rType = ack.rType
t.mu.Lock()
defer t.mu.Unlock()
@ -394,16 +393,16 @@ func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStrea
// wire. We may not send the request if the watch is canceled. But the nonce
// needs to be updated so the next request will have the right nonce.
nonce = ack.nonce
t.nonceMap[typeURL] = nonce
t.nonceMap[rType] = nonce
s, ok := t.watchMap[typeURL]
s, ok := t.watchMap[rType]
if !ok || len(s) == 0 {
// We don't send the request ack if there's no active watch (this can be
// either the server sends responses before any request, or the watch is
// canceled while the ackAction is in queue), because there's no resource
// name. And if we send a request with empty resource name list, the
// server may treat it as a wild card and send us everything.
return nil, "", "", "", false
return nil, UnknownResource, "", "", false
}
send = true
target = mapToSlice(s)
@ -411,12 +410,12 @@ func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStrea
version = ack.version
if version == "" {
// This is a nack, get the previous acked version.
version = t.versionMap[typeURL]
// version will still be an empty string if typeURL isn't
version = t.versionMap[rType]
// version will still be an empty string if rType isn't
// found in versionMap, this can happen if there wasn't any ack
// before.
} else {
t.versionMap[typeURL] = version
t.versionMap[rType] = version
}
return target, typeURL, version, nonce, send
return target, rType, version, nonce, send
}

View File

@ -39,6 +39,15 @@ func init() {
xdsclient.RegisterAPIClientBuilder(clientBuilder{})
}
var (
resourceTypeToURL = map[xdsclient.ResourceType]string{
xdsclient.ListenerResource: version.V2ListenerURL,
xdsclient.RouteConfigResource: version.V2RouteConfigURL,
xdsclient.ClusterResource: version.V2ClusterURL,
xdsclient.EndpointsResource: version.V2EndpointsURL,
}
)
type clientBuilder struct{}
func (clientBuilder) Build(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) {
@ -95,30 +104,30 @@ type client struct {
// AddWatch overrides the transport helper's AddWatch to save the LDS
// resource_name. This is required when handling an RDS response to perform host
// matching.
func (v2c *client) AddWatch(resourceType, resourceName string) {
func (v2c *client) AddWatch(rType xdsclient.ResourceType, rName string) {
v2c.mu.Lock()
// Special handling for LDS, because RDS needs the LDS resource_name for
// response host matching.
if resourceType == version.V2ListenerURL || resourceType == version.V3ListenerURL {
if rType == xdsclient.ListenerResource {
// Set hostname to the first LDS resource_name, and reset it when the
// last LDS watch is removed. The upper level Client isn't expected to
// watchLDS more than once.
v2c.ldsWatchCount++
if v2c.ldsWatchCount == 1 {
v2c.ldsResourceName = resourceName
v2c.ldsResourceName = rName
}
}
v2c.mu.Unlock()
v2c.TransportHelper.AddWatch(resourceType, resourceName)
v2c.TransportHelper.AddWatch(rType, rName)
}
// RemoveWatch overrides the transport helper's RemoveWatch to clear the LDS
// resource_name when the last watch is removed.
func (v2c *client) RemoveWatch(resourceType, resourceName string) {
func (v2c *client) RemoveWatch(rType xdsclient.ResourceType, rName string) {
v2c.mu.Lock()
// Special handling for LDS, because RDS needs the LDS resource_name for
// response host matching.
if resourceType == version.V2ListenerURL || resourceType == version.V3ListenerURL {
if rType == xdsclient.ListenerResource {
// Set hostname to the first LDS resource_name, and reset it when the
// last LDS watch is removed. The upper level Client isn't expected to
// watchLDS more than once.
@ -128,30 +137,29 @@ func (v2c *client) RemoveWatch(resourceType, resourceName string) {
}
}
v2c.mu.Unlock()
v2c.TransportHelper.RemoveWatch(resourceType, resourceName)
v2c.TransportHelper.RemoveWatch(rType, rName)
}
func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
return v2adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc).StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true))
}
// sendRequest sends a request for provided typeURL and resource on the provided
// stream.
// sendRequest sends out a DiscoveryRequest for the given resourceNames, of type
// rType, on the provided stream.
//
// version is the ack version to be sent with the request
// - If this is the new request (not an ack/nack), version will be an empty
// string
// - If this is an ack, version will be the version from the response
// - If this is the new request (not an ack/nack), version will be empty.
// - If this is an ack, version will be the version from the response.
// - If this is a nack, version will be the previous acked version (from
// versionMap). If there was no ack before, it will be an empty string
func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, typeURL, version, nonce string) error {
// versionMap). If there was no ack before, it will be empty.
func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
stream, ok := s.(adsStream)
if !ok {
return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
}
req := &v2xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: typeURL,
TypeUrl: resourceTypeToURL[rType],
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
@ -182,10 +190,11 @@ func (v2c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) {
return resp, nil
}
func (v2c *client) HandleResponse(r proto.Message) (string, string, string, error) {
func (v2c *client) HandleResponse(r proto.Message) (xdsclient.ResourceType, string, string, error) {
rType := xdsclient.UnknownResource
resp, ok := r.(*v2xdspb.DiscoveryResponse)
if !ok {
return "", "", "", fmt.Errorf("xds: unsupported message type: %T", resp)
return rType, "", "", fmt.Errorf("xds: unsupported message type: %T", resp)
}
// Note that the xDS transport protocol is versioned independently of
@ -193,21 +202,26 @@ func (v2c *client) HandleResponse(r proto.Message) (string, string, string, erro
// of resource types using new versions of the transport protocol, or
// vice-versa. Hence we need to handle v3 type_urls as well here.
var err error
switch resp.GetTypeUrl() {
case version.V2ListenerURL, version.V3ListenerURL:
url := resp.GetTypeUrl()
switch {
case xdsclient.IsListenerResource(url):
err = v2c.handleLDSResponse(resp)
case version.V2RouteConfigURL, version.V3RouteConfigURL:
rType = xdsclient.ListenerResource
case xdsclient.IsRouteConfigResource(url):
err = v2c.handleRDSResponse(resp)
case version.V2ClusterURL, version.V3ClusterURL:
rType = xdsclient.RouteConfigResource
case xdsclient.IsClusterResource(url):
err = v2c.handleCDSResponse(resp)
case version.V2EndpointsURL, version.V3EndpointsURL:
rType = xdsclient.ClusterResource
case xdsclient.IsEndpointsResource(url):
err = v2c.handleEDSResponse(resp)
rType = xdsclient.EndpointsResource
default:
return "", "", "", xdsclient.ErrResourceTypeUnsupported{
return rType, "", "", xdsclient.ErrResourceTypeUnsupported{
ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()),
}
}
return resp.GetTypeUrl(), resp.GetVersionInfo(), resp.GetNonce(), err
return rType, resp.GetVersionInfo(), resp.GetNonce(), err
}
// handleLDSResponse processes an LDS response received from the xDS server. On

View File

@ -28,6 +28,7 @@ import (
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
"google.golang.org/grpc/xds/internal/version"
@ -39,22 +40,22 @@ func startXDSV2Client(t *testing.T, cc *grpc.ClientConn) (v2c *client, cbLDS, cb
cbCDS = testutils.NewChannel()
cbEDS = testutils.NewChannel()
v2c, err := newV2Client(&testUpdateReceiver{
f: func(typeURL string, d map[string]interface{}) {
t.Logf("Received %s callback with {%+v}", typeURL, d)
switch typeURL {
case version.V2ListenerURL:
f: func(rType xdsclient.ResourceType, d map[string]interface{}) {
t.Logf("Received %v callback with {%+v}", rType, d)
switch rType {
case xdsclient.ListenerResource:
if _, ok := d[goodLDSTarget1]; ok {
cbLDS.Send(struct{}{})
}
case version.V2RouteConfigURL:
case xdsclient.RouteConfigResource:
if _, ok := d[goodRouteName1]; ok {
cbRDS.Send(struct{}{})
}
case version.V2ClusterURL:
case xdsclient.ClusterResource:
if _, ok := d[goodClusterName1]; ok {
cbCDS.Send(struct{}{})
}
case version.V2EndpointsURL:
case xdsclient.EndpointsResource:
if _, ok := d[goodEDSName]; ok {
cbEDS.Send(struct{}{})
}
@ -98,30 +99,24 @@ func sendXDSRespWithVersion(ch chan<- *fakeserver.Response, respWithoutVersion *
// startXDS calls watch to send the first request. It then sends a good response
// and checks for ack.
func startXDS(t *testing.T, xdsname string, v2c *client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest, preVersion string, preNonce string) {
var (
nameToWatch, typeURLToWatch string
)
switch xdsname {
case "LDS":
typeURLToWatch = version.V2ListenerURL
func startXDS(t *testing.T, rType xdsclient.ResourceType, v2c *client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest, preVersion string, preNonce string) {
nameToWatch := ""
switch rType {
case xdsclient.ListenerResource:
nameToWatch = goodLDSTarget1
case "RDS":
typeURLToWatch = version.V2RouteConfigURL
case xdsclient.RouteConfigResource:
nameToWatch = goodRouteName1
case "CDS":
typeURLToWatch = version.V2ClusterURL
case xdsclient.ClusterResource:
nameToWatch = goodClusterName1
case "EDS":
typeURLToWatch = version.V2EndpointsURL
case xdsclient.EndpointsResource:
nameToWatch = goodEDSName
}
v2c.AddWatch(typeURLToWatch, nameToWatch)
v2c.AddWatch(rType, nameToWatch)
if err := compareXDSRequest(reqChan, req, preVersion, preNonce); err != nil {
t.Fatalf("Failed to receive %s request: %v", xdsname, err)
t.Fatalf("Failed to receive %v request: %v", rType, err)
}
t.Logf("FakeServer received %s request...", xdsname)
t.Logf("FakeServer received %v request...", rType)
}
// sendGoodResp sends the good response, with the given version, and a random
@ -129,19 +124,19 @@ func startXDS(t *testing.T, xdsname string, v2c *client, reqChan *testutils.Chan
//
// It also waits and checks that the ack request contains the given version, and
// the generated nonce.
func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, ver int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) (string, error) {
func sendGoodResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakeserver.Server, ver int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) (string, error) {
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, ver)
t.Logf("Good %s response pushed to fakeServer...", xdsname)
t.Logf("Good %v response pushed to fakeServer...", rType)
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce); err != nil {
return "", fmt.Errorf("failed to receive %s request: %v", xdsname, err)
return "", fmt.Errorf("failed to receive %v request: %v", rType, err)
}
t.Logf("Good %s response acked", xdsname)
t.Logf("Good %v response acked", rType)
if _, err := callbackCh.Receive(); err != nil {
return "", fmt.Errorf("timeout when expecting %s update", xdsname)
return "", fmt.Errorf("timeout when expecting %v update", rType)
}
t.Logf("Good %s response callback executed", xdsname)
t.Logf("Good %v response callback executed", rType)
return nonce, nil
}
@ -149,27 +144,27 @@ func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, v
// be nacked, so we expect a request with the previous version (version-1).
//
// But the nonce in request should be the new nonce.
func sendBadResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, ver int, wantReq *xdspb.DiscoveryRequest) error {
func sendBadResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakeserver.Server, ver int, wantReq *xdspb.DiscoveryRequest) error {
var typeURL string
switch xdsname {
case "LDS":
switch rType {
case xdsclient.ListenerResource:
typeURL = version.V2ListenerURL
case "RDS":
case xdsclient.RouteConfigResource:
typeURL = version.V2RouteConfigURL
case "CDS":
case xdsclient.ClusterResource:
typeURL = version.V2ClusterURL
case "EDS":
case xdsclient.EndpointsResource:
typeURL = version.V2EndpointsURL
}
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: typeURL,
}, ver)
t.Logf("Bad %s response pushed to fakeServer...", xdsname)
t.Logf("Bad %v response pushed to fakeServer...", rType)
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce); err != nil {
return fmt.Errorf("failed to receive %s request: %v", xdsname, err)
return fmt.Errorf("failed to receive %v request: %v", rType, err)
}
t.Logf("Bad %s response nacked", xdsname)
t.Logf("Bad %v response nacked", rType)
return nil
}
@ -192,59 +187,59 @@ func (s) TestV2ClientAck(t *testing.T) {
defer v2cCleanup()
// Start the watch, send a good response, and check for ack.
startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "")
if _, err := sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil {
startXDS(t, xdsclient.ListenerResource, v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "")
if _, err := sendGoodResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil {
t.Fatal(err)
}
versionLDS++
startXDS(t, "RDS", v2c, fakeServer.XDSRequestChan, goodRDSRequest, "", "")
if _, err := sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS); err != nil {
startXDS(t, xdsclient.RouteConfigResource, v2c, fakeServer.XDSRequestChan, goodRDSRequest, "", "")
if _, err := sendGoodResp(t, xdsclient.RouteConfigResource, fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS); err != nil {
t.Fatal(err)
}
versionRDS++
startXDS(t, "CDS", v2c, fakeServer.XDSRequestChan, goodCDSRequest, "", "")
if _, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil {
startXDS(t, xdsclient.ClusterResource, v2c, fakeServer.XDSRequestChan, goodCDSRequest, "", "")
if _, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil {
t.Fatal(err)
}
versionCDS++
startXDS(t, "EDS", v2c, fakeServer.XDSRequestChan, goodEDSRequest, "", "")
if _, err := sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS); err != nil {
startXDS(t, xdsclient.EndpointsResource, v2c, fakeServer.XDSRequestChan, goodEDSRequest, "", "")
if _, err := sendGoodResp(t, xdsclient.EndpointsResource, fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS); err != nil {
t.Fatal(err)
}
versionEDS++
// Send a bad response, and check for nack.
if err := sendBadResp(t, "LDS", fakeServer, versionLDS, goodLDSRequest); err != nil {
if err := sendBadResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSRequest); err != nil {
t.Fatal(err)
}
versionLDS++
if err := sendBadResp(t, "RDS", fakeServer, versionRDS, goodRDSRequest); err != nil {
if err := sendBadResp(t, xdsclient.RouteConfigResource, fakeServer, versionRDS, goodRDSRequest); err != nil {
t.Fatal(err)
}
versionRDS++
if err := sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest); err != nil {
if err := sendBadResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSRequest); err != nil {
t.Fatal(err)
}
versionCDS++
if err := sendBadResp(t, "EDS", fakeServer, versionEDS, goodEDSRequest); err != nil {
if err := sendBadResp(t, xdsclient.EndpointsResource, fakeServer, versionEDS, goodEDSRequest); err != nil {
t.Fatal(err)
}
versionEDS++
// send another good response, and check for ack, with the new version.
if _, err := sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil {
if _, err := sendGoodResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil {
t.Fatal(err)
}
versionLDS++
if _, err := sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS); err != nil {
if _, err := sendGoodResp(t, xdsclient.RouteConfigResource, fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS); err != nil {
t.Fatal(err)
}
versionRDS++
if _, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil {
if _, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil {
t.Fatal(err)
}
versionCDS++
if _, err := sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS); err != nil {
if _, err := sendGoodResp(t, xdsclient.EndpointsResource, fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS); err != nil {
t.Fatal(err)
}
versionEDS++
@ -262,7 +257,7 @@ func (s) TestV2ClientAckFirstIsNack(t *testing.T) {
defer v2cCleanup()
// Start the watch, send a good response, and check for ack.
startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "")
startXDS(t, xdsclient.ListenerResource, v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "")
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
@ -278,7 +273,7 @@ func (s) TestV2ClientAckFirstIsNack(t *testing.T) {
t.Logf("Bad response nacked")
versionLDS++
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
sendGoodResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
}
@ -294,14 +289,14 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) {
defer v2cCleanup()
// Start the watch, send a good response, and check for ack.
startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "")
nonce, err := sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
startXDS(t, xdsclient.ListenerResource, v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "")
nonce, err := sendGoodResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
if err != nil {
t.Fatal(err)
}
// Start a new watch. The version in the new request should be the version
// from the previous response, thus versionLDS before ++.
startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS), nonce)
startXDS(t, xdsclient.ListenerResource, v2c, fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS), nonce)
versionLDS++
// This is an invalid response after the new watch.
@ -318,7 +313,7 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) {
t.Logf("Bad response nacked")
versionLDS++
if _, err := sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil {
if _, err := sendGoodResp(t, xdsclient.ListenerResource, fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS); err != nil {
t.Fatal(err)
}
versionLDS++
@ -336,42 +331,42 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) {
defer v2cCleanup()
// Start a CDS watch.
v2c.AddWatch(version.V2ClusterURL, goodClusterName1)
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
t.Fatal(err)
}
t.Logf("FakeServer received %s request...", "CDS")
t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
// Send a good CDS response, this function waits for the ACK with the right
// version.
nonce, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
nonce, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
if err != nil {
t.Fatal(err)
}
// Cancel the CDS watch, and start a new one. The new watch should have the
// version from the response above.
v2c.RemoveWatch(version.V2ClusterURL, goodClusterName1)
v2c.RemoveWatch(xdsclient.ClusterResource, goodClusterName1)
// Wait for a request with no resource names, because the only watch was
// removed.
emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
t.Fatalf("Failed to receive %s request: %v", "CDS", err)
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
v2c.AddWatch(version.V2ClusterURL, goodClusterName1)
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
// Wait for a request with correct resource names and version.
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil {
t.Fatalf("Failed to receive %s request: %v", "CDS", err)
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
versionCDS++
// Send a bad response with the next version.
if err := sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest); err != nil {
if err := sendBadResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSRequest); err != nil {
t.Fatal(err)
}
versionCDS++
// send another good response, and check for ack, with the new version.
if _, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil {
if _, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil {
t.Fatal(err)
}
versionCDS++
@ -391,25 +386,25 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
defer v2cCleanup()
// Start a CDS watch.
v2c.AddWatch(version.V2ClusterURL, goodClusterName1)
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
t.Fatalf("Failed to receive %s request: %v", "CDS", err)
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
t.Logf("FakeServer received %s request...", "CDS")
t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
// send a good response, and check for ack, with the new version.
nonce, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
nonce, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
if err != nil {
t.Fatal(err)
}
// Cancel the watch before the next response is sent. This mimics the case
// watch is canceled while response is on wire.
v2c.RemoveWatch(version.V2ClusterURL, goodClusterName1)
v2c.RemoveWatch(xdsclient.ClusterResource, goodClusterName1)
// Wait for a request with no resource names, because the only watch was
// removed.
emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
t.Fatalf("Failed to receive %s request: %v", "CDS", err)
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
versionCDS++
@ -419,7 +414,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
// Send a good response.
nonce = sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodCDSResponse1, versionCDS)
t.Logf("Good %s response pushed to fakeServer...", "CDS")
t.Logf("Good %v response pushed to fakeServer...", xdsclient.ClusterResource)
// Expect no ACK because watch was canceled.
if req, err := fakeServer.XDSRequestChan.Receive(); err != testutils.ErrRecvTimeout {
@ -427,24 +422,24 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
}
// Still expected an callback update, because response was good.
if _, err := cbCDS.Receive(); err != nil {
t.Fatalf("Timeout when expecting %s update", "CDS")
t.Fatalf("Timeout when expecting %v update", xdsclient.ClusterResource)
}
// Start a new watch. The new watch should have the nonce from the response
// above, and version from the first good response.
v2c.AddWatch(version.V2ClusterURL, goodClusterName1)
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil {
t.Fatalf("Failed to receive %s request: %v", "CDS", err)
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
// Send a bad response with the next version.
if err := sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest); err != nil {
if err := sendBadResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSRequest); err != nil {
t.Fatal(err)
}
versionCDS++
// send another good response, and check for ack, with the new version.
if _, err := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil {
if _, err := sendGoodResp(t, xdsclient.ClusterResource, fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS); err != nil {
t.Fatal(err)
}
versionCDS++

View File

@ -153,7 +153,7 @@ func (s) TestCDSHandleResponse(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testWatchHandle(t, &watchHandleTestcase{
typeURL: version.V2ClusterURL,
rType: xdsclient.ClusterResource,
resourceName: goodClusterName1,
responseToHandle: test.cdsResponse,
@ -172,7 +172,7 @@ func (s) TestCDSHandleResponseWithoutWatch(t *testing.T) {
defer cleanup()
v2c, err := newV2Client(&testUpdateReceiver{
f: func(string, map[string]interface{}) {},
f: func(xdsclient.ResourceType, map[string]interface{}) {},
}, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil)
if err != nil {
t.Fatal(err)

View File

@ -135,7 +135,7 @@ func (s) TestEDSHandleResponse(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testWatchHandle(t, &watchHandleTestcase{
typeURL: version.V2EndpointsURL,
rType: xdsclient.EndpointsResource,
resourceName: goodEDSName,
responseToHandle: test.edsResponse,
wantHandleErr: test.wantErr,
@ -153,7 +153,7 @@ func (s) TestEDSHandleResponseWithoutWatch(t *testing.T) {
defer cleanup()
v2c, err := newV2Client(&testUpdateReceiver{
f: func(string, map[string]interface{}) {},
f: func(xdsclient.ResourceType, map[string]interface{}) {},
}, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil)
if err != nil {
t.Fatal(err)

View File

@ -23,8 +23,8 @@ import (
"time"
v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/version"
)
// TestLDSHandleResponse starts a fake xDS server, makes a ClientConn to it,
@ -113,7 +113,7 @@ func (s) TestLDSHandleResponse(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testWatchHandle(t, &watchHandleTestcase{
typeURL: version.V2ListenerURL,
rType: xdsclient.ListenerResource,
resourceName: goodLDSTarget1,
responseToHandle: test.ldsResponse,
wantHandleErr: test.wantErr,
@ -131,7 +131,7 @@ func (s) TestLDSHandleResponseWithoutWatch(t *testing.T) {
defer cleanup()
v2c, err := newV2Client(&testUpdateReceiver{
f: func(string, map[string]interface{}) {},
f: func(xdsclient.ResourceType, map[string]interface{}) {},
}, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil)
if err != nil {
t.Fatal(err)

View File

@ -23,9 +23,9 @@ import (
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
"google.golang.org/grpc/xds/internal/version"
)
// doLDS makes a LDS watch, and waits for the response and ack to finish.
@ -34,7 +34,7 @@ import (
// pre-requirement for RDS, and RDS handle would fail without an existing LDS
// watch.
func doLDS(t *testing.T, v2c xdsclient.APIClient, fakeServer *fakeserver.Server) {
v2c.AddWatch(version.V2ListenerURL, goodLDSTarget1)
v2c.AddWatch(xdsclient.ListenerResource, goodLDSTarget1)
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout waiting for LDS request: %v", err)
}
@ -112,7 +112,7 @@ func (s) TestRDSHandleResponseWithRouting(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testWatchHandle(t, &watchHandleTestcase{
typeURL: version.V2RouteConfigURL,
rType: xdsclient.RouteConfigResource,
resourceName: goodRouteName1,
responseToHandle: test.rdsResponse,
wantHandleErr: test.wantErr,
@ -130,7 +130,7 @@ func (s) TestRDSHandleResponseWithoutLDSWatch(t *testing.T) {
defer cleanup()
v2c, err := newV2Client(&testUpdateReceiver{
f: func(string, map[string]interface{}) {},
f: func(xdsclient.ResourceType, map[string]interface{}) {},
}, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil)
if err != nil {
t.Fatal(err)
@ -149,7 +149,7 @@ func (s) TestRDSHandleResponseWithoutRDSWatch(t *testing.T) {
defer cleanup()
v2c, err := newV2Client(&testUpdateReceiver{
f: func(string, map[string]interface{}) {},
f: func(xdsclient.ResourceType, map[string]interface{}) {},
}, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil)
if err != nil {
t.Fatal(err)

View File

@ -331,7 +331,7 @@ var (
)
type watchHandleTestcase struct {
typeURL string
rType xdsclient.ResourceType
resourceName string
responseToHandle *xdspb.DiscoveryResponse
@ -341,7 +341,7 @@ type watchHandleTestcase struct {
}
type testUpdateReceiver struct {
f func(typeURL string, d map[string]interface{})
f func(rType xdsclient.ResourceType, d map[string]interface{})
}
func (t *testUpdateReceiver) NewListeners(d map[string]xdsclient.ListenerUpdate) {
@ -349,7 +349,7 @@ func (t *testUpdateReceiver) NewListeners(d map[string]xdsclient.ListenerUpdate)
for k, v := range d {
dd[k] = v
}
t.newUpdate(version.V2ListenerURL, dd)
t.newUpdate(xdsclient.ListenerResource, dd)
}
func (t *testUpdateReceiver) NewRouteConfigs(d map[string]xdsclient.RouteConfigUpdate) {
@ -357,7 +357,7 @@ func (t *testUpdateReceiver) NewRouteConfigs(d map[string]xdsclient.RouteConfigU
for k, v := range d {
dd[k] = v
}
t.newUpdate(version.V2RouteConfigURL, dd)
t.newUpdate(xdsclient.RouteConfigResource, dd)
}
func (t *testUpdateReceiver) NewClusters(d map[string]xdsclient.ClusterUpdate) {
@ -365,7 +365,7 @@ func (t *testUpdateReceiver) NewClusters(d map[string]xdsclient.ClusterUpdate) {
for k, v := range d {
dd[k] = v
}
t.newUpdate(version.V2ClusterURL, dd)
t.newUpdate(xdsclient.ClusterResource, dd)
}
func (t *testUpdateReceiver) NewEndpoints(d map[string]xdsclient.EndpointsUpdate) {
@ -373,11 +373,11 @@ func (t *testUpdateReceiver) NewEndpoints(d map[string]xdsclient.EndpointsUpdate
for k, v := range d {
dd[k] = v
}
t.newUpdate(version.V2EndpointsURL, dd)
t.newUpdate(xdsclient.EndpointsResource, dd)
}
func (t *testUpdateReceiver) newUpdate(typeURL string, d map[string]interface{}) {
t.f(typeURL, d)
func (t *testUpdateReceiver) newUpdate(rType xdsclient.ResourceType, d map[string]interface{}) {
t.f(rType, d)
}
// testWatchHandle is called to test response handling for each xDS.
@ -397,8 +397,8 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) {
gotUpdateCh := testutils.NewChannel()
v2c, err := newV2Client(&testUpdateReceiver{
f: func(typeURL string, d map[string]interface{}) {
if typeURL == test.typeURL {
f: func(rType xdsclient.ResourceType, d map[string]interface{}) {
if rType == test.rType {
if u, ok := d[test.resourceName]; ok {
gotUpdateCh.Send(updateErr{u, nil})
}
@ -410,14 +410,14 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) {
}
defer v2c.Close()
// RDS needs an existin LDS watch for the hostname.
if test.typeURL == version.V2RouteConfigURL {
// RDS needs an existing LDS watch for the hostname.
if test.rType == xdsclient.RouteConfigResource {
doLDS(t, v2c, fakeServer)
}
// Register the watcher, this will also trigger the v2Client to send the xDS
// request.
v2c.AddWatch(test.typeURL, test.resourceName)
v2c.AddWatch(test.rType, test.resourceName)
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
@ -432,14 +432,14 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) {
// Also note that this won't trigger ACK, so there's no need to clear the
// request channel afterwards.
var handleXDSResp func(response *xdspb.DiscoveryResponse) error
switch test.typeURL {
case version.V2ListenerURL:
switch test.rType {
case xdsclient.ListenerResource:
handleXDSResp = v2c.handleLDSResponse
case version.V2RouteConfigURL:
case xdsclient.RouteConfigResource:
handleXDSResp = v2c.handleRDSResponse
case version.V2ClusterURL:
case xdsclient.ClusterResource:
handleXDSResp = v2c.handleCDSResponse
case version.V2EndpointsURL:
case xdsclient.EndpointsResource:
handleXDSResp = v2c.handleEDSResponse
}
if err := handleXDSResp(test.responseToHandle); (err != nil) != test.wantHandleErr {
@ -524,7 +524,7 @@ func (s) TestV2ClientBackoffAfterRecvError(t *testing.T) {
callbackCh := make(chan struct{})
v2c, err := newV2Client(&testUpdateReceiver{
f: func(string, map[string]interface{}) { close(callbackCh) },
f: func(xdsclient.ResourceType, map[string]interface{}) { close(callbackCh) },
}, cc, goodNodeProto, clientBackoff, nil)
if err != nil {
t.Fatal(err)
@ -532,7 +532,7 @@ func (s) TestV2ClientBackoffAfterRecvError(t *testing.T) {
defer v2c.Close()
t.Log("Started xds v2Client...")
v2c.AddWatch(version.V2ListenerURL, goodLDSTarget1)
v2c.AddWatch(xdsclient.ListenerResource, goodLDSTarget1)
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
@ -567,8 +567,8 @@ func (s) TestV2ClientRetriesAfterBrokenStream(t *testing.T) {
callbackCh := testutils.NewChannel()
v2c, err := newV2Client(&testUpdateReceiver{
f: func(typeURL string, d map[string]interface{}) {
if typeURL == version.V2ListenerURL {
f: func(rType xdsclient.ResourceType, d map[string]interface{}) {
if rType == xdsclient.ListenerResource {
if u, ok := d[goodLDSTarget1]; ok {
t.Logf("Received LDS callback with ldsUpdate {%+v}", u)
callbackCh.Send(struct{}{})
@ -582,7 +582,7 @@ func (s) TestV2ClientRetriesAfterBrokenStream(t *testing.T) {
defer v2c.Close()
t.Log("Started xds v2Client...")
v2c.AddWatch(version.V2ListenerURL, goodLDSTarget1)
v2c.AddWatch(xdsclient.ListenerResource, goodLDSTarget1)
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
@ -637,8 +637,8 @@ func (s) TestV2ClientWatchWithoutStream(t *testing.T) {
callbackCh := testutils.NewChannel()
v2c, err := newV2Client(&testUpdateReceiver{
f: func(typeURL string, d map[string]interface{}) {
if typeURL == version.V2ListenerURL {
f: func(rType xdsclient.ResourceType, d map[string]interface{}) {
if rType == xdsclient.ListenerResource {
if u, ok := d[goodLDSTarget1]; ok {
t.Logf("Received LDS callback with ldsUpdate {%+v}", u)
callbackCh.Send(u)
@ -654,7 +654,7 @@ func (s) TestV2ClientWatchWithoutStream(t *testing.T) {
// This watch is started when the xds-ClientConn is in Transient Failure,
// and no xds stream is created.
v2c.AddWatch(version.V2ListenerURL, goodLDSTarget1)
v2c.AddWatch(xdsclient.ListenerResource, goodLDSTarget1)
// The watcher should receive an update, with a timeout error in it.
if v, err := callbackCh.TimedReceive(100 * time.Millisecond); err == nil {

View File

@ -39,6 +39,15 @@ func init() {
xdsclient.RegisterAPIClientBuilder(clientBuilder{})
}
var (
resourceTypeToURL = map[xdsclient.ResourceType]string{
xdsclient.ListenerResource: version.V2ListenerURL,
xdsclient.RouteConfigResource: version.V2RouteConfigURL,
xdsclient.ClusterResource: version.V2ClusterURL,
xdsclient.EndpointsResource: version.V2EndpointsURL,
}
)
type clientBuilder struct{}
func (clientBuilder) Build(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) {
@ -93,30 +102,30 @@ type client struct {
}
// AddWatch overrides the transport helper's AddWatch to save the LDS
// resource_name. This is required when handling an RDS response to perform hot
// resource_name. This is required when handling an RDS response to perform host
// matching.
func (v3c *client) AddWatch(resourceType, resourceName string) {
func (v3c *client) AddWatch(rType xdsclient.ResourceType, rName string) {
v3c.mu.Lock()
// Special handling for LDS, because RDS needs the LDS resource_name for
// response host matching.
if resourceType == version.V2ListenerURL || resourceType == version.V3ListenerURL {
if rType == xdsclient.ListenerResource {
// Set hostname to the first LDS resource_name, and reset it when the
// last LDS watch is removed. The upper level Client isn't expected to
// watchLDS more than once.
v3c.ldsWatchCount++
if v3c.ldsWatchCount == 1 {
v3c.ldsResourceName = resourceName
v3c.ldsResourceName = rName
}
}
v3c.mu.Unlock()
v3c.TransportHelper.AddWatch(resourceType, resourceName)
v3c.TransportHelper.AddWatch(rType, rName)
}
func (v3c *client) RemoveWatch(resourceType, resourceName string) {
func (v3c *client) RemoveWatch(rType xdsclient.ResourceType, rName string) {
v3c.mu.Lock()
// Special handling for LDS, because RDS needs the LDS resource_name for
// response host matching.
if resourceType == version.V2ListenerURL || resourceType == version.V3ListenerURL {
if rType == xdsclient.ListenerResource {
// Set hostname to the first LDS resource_name, and reset it when the
// last LDS watch is removed. The upper level Client isn't expected to
// watchLDS more than once.
@ -126,30 +135,29 @@ func (v3c *client) RemoveWatch(resourceType, resourceName string) {
}
}
v3c.mu.Unlock()
v3c.TransportHelper.RemoveWatch(resourceType, resourceName)
v3c.TransportHelper.RemoveWatch(rType, rName)
}
func (v3c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
return v3adsgrpc.NewAggregatedDiscoveryServiceClient(v3c.cc).StreamAggregatedResources(v3c.ctx, grpc.WaitForReady(true))
}
// sendRequest sends a request for provided typeURL and resource on the provided
// stream.
// sendRequest sends out a DiscoveryRequest for the given resourceNames, of type
// rType, on the provided stream.
//
// version is the ack version to be sent with the request
// - If this is the new request (not an ack/nack), version will be an empty
// string
// - If this is an ack, version will be the version from the response
// - If this is the new request (not an ack/nack), version will be empty.
// - If this is an ack, version will be the version from the response.
// - If this is a nack, version will be the previous acked version (from
// versionMap). If there was no ack before, it will be an empty string
func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, typeURL, version, nonce string) error {
// versionMap). If there was no ack before, it will be empty.
func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
stream, ok := s.(adsStream)
if !ok {
return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
}
req := &v3discoverypb.DiscoveryRequest{
Node: v3c.nodeProto,
TypeUrl: typeURL,
TypeUrl: resourceTypeToURL[rType],
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
@ -180,10 +188,11 @@ func (v3c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) {
return resp, nil
}
func (v3c *client) HandleResponse(r proto.Message) (string, string, string, error) {
func (v3c *client) HandleResponse(r proto.Message) (xdsclient.ResourceType, string, string, error) {
rType := xdsclient.UnknownResource
resp, ok := r.(*v3discoverypb.DiscoveryResponse)
if !ok {
return "", "", "", fmt.Errorf("xds: unsupported message type: %T", resp)
return rType, "", "", fmt.Errorf("xds: unsupported message type: %T", resp)
}
// Note that the xDS transport protocol is versioned independently of
@ -191,21 +200,26 @@ func (v3c *client) HandleResponse(r proto.Message) (string, string, string, erro
// of resource types using new versions of the transport protocol, or
// vice-versa. Hence we need to handle v3 type_urls as well here.
var err error
switch resp.GetTypeUrl() {
case version.V2ListenerURL, version.V3ListenerURL:
url := resp.GetTypeUrl()
switch {
case xdsclient.IsListenerResource(url):
err = v3c.handleLDSResponse(resp)
case version.V2RouteConfigURL, version.V3RouteConfigURL:
rType = xdsclient.ListenerResource
case xdsclient.IsRouteConfigResource(url):
err = v3c.handleRDSResponse(resp)
case version.V2ClusterURL, version.V3ClusterURL:
rType = xdsclient.RouteConfigResource
case xdsclient.IsClusterResource(url):
err = v3c.handleCDSResponse(resp)
case version.V2EndpointsURL, version.V3EndpointsURL:
rType = xdsclient.ClusterResource
case xdsclient.IsEndpointsResource(url):
err = v3c.handleEDSResponse(resp)
rType = xdsclient.EndpointsResource
default:
return "", "", "", xdsclient.ErrResourceTypeUnsupported{
return rType, "", "", xdsclient.ErrResourceTypeUnsupported{
ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()),
}
}
return resp.GetTypeUrl(), resp.GetVersionInfo(), resp.GetNonce(), err
return rType, resp.GetVersionInfo(), resp.GetNonce(), err
}
// handleLDSResponse processes an LDS response received from the xDS server. On

View File

@ -26,5 +26,6 @@ package xds
import (
_ "google.golang.org/grpc/xds/internal/balancer" // Register the balancers.
_ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client.
_ "google.golang.org/grpc/xds/internal/client/v3" // Register the v3 xDS API client.
_ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver.
)