completely delete WatchListener and WatchRouteConfig APIs (#6849)

This commit is contained in:
Easwar Swaminathan 2023-12-14 16:29:26 -08:00 committed by GitHub
parent 836e5de556
commit 6e6914a7af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 421 additions and 636 deletions

View File

@ -26,7 +26,6 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
// Client is a fake implementation of an xds client. It exposes a bunch of
@ -38,151 +37,10 @@ type Client struct {
xdsclient.XDSClient
name string
ldsWatchCh *testutils.Channel
rdsWatchCh *testutils.Channel
cdsWatchCh *testutils.Channel
edsWatchCh *testutils.Channel
ldsCancelCh *testutils.Channel
rdsCancelCh *testutils.Channel
cdsCancelCh *testutils.Channel
edsCancelCh *testutils.Channel
loadReportCh *testutils.Channel
lrsCancelCh *testutils.Channel
loadStore *load.Store
bootstrapCfg *bootstrap.Config
ldsCb func(xdsresource.ListenerUpdate, error)
rdsCbs map[string]func(xdsresource.RouteConfigUpdate, error)
cdsCbs map[string]func(xdsresource.ClusterUpdate, error)
edsCbs map[string]func(xdsresource.EndpointsUpdate, error)
}
// WatchListener registers a LDS watch.
func (xdsC *Client) WatchListener(serviceName string, callback func(xdsresource.ListenerUpdate, error)) func() {
xdsC.ldsCb = callback
xdsC.ldsWatchCh.Send(serviceName)
return func() {
xdsC.ldsCancelCh.Send(nil)
}
}
// WaitForWatchListener waits for WatchCluster to be invoked on this client and
// returns the serviceName being watched.
func (xdsC *Client) WaitForWatchListener(ctx context.Context) (string, error) {
val, err := xdsC.ldsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}
// InvokeWatchListenerCallback invokes the registered ldsWatch callback.
//
// Not thread safe with WatchListener. Only call this after
// WaitForWatchListener.
func (xdsC *Client) InvokeWatchListenerCallback(update xdsresource.ListenerUpdate, err error) {
xdsC.ldsCb(update, err)
}
// WaitForCancelListenerWatch waits for a LDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelListenerWatch(ctx context.Context) error {
_, err := xdsC.ldsCancelCh.Receive(ctx)
return err
}
// WatchRouteConfig registers a RDS watch.
func (xdsC *Client) WatchRouteConfig(routeName string, callback func(xdsresource.RouteConfigUpdate, error)) func() {
xdsC.rdsCbs[routeName] = callback
xdsC.rdsWatchCh.Send(routeName)
return func() {
xdsC.rdsCancelCh.Send(routeName)
}
}
// WaitForWatchRouteConfig waits for WatchCluster to be invoked on this client and
// returns the routeName being watched.
func (xdsC *Client) WaitForWatchRouteConfig(ctx context.Context) (string, error) {
val, err := xdsC.rdsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}
// InvokeWatchRouteConfigCallback invokes the registered rdsWatch callback.
//
// Not thread safe with WatchRouteConfig. Only call this after
// WaitForWatchRouteConfig.
func (xdsC *Client) InvokeWatchRouteConfigCallback(name string, update xdsresource.RouteConfigUpdate, err error) {
if len(xdsC.rdsCbs) != 1 {
xdsC.rdsCbs[name](update, err)
return
}
// Keeps functionality with previous usage of this on client side, if single
// callback call that callback.
var routeName string
for route := range xdsC.rdsCbs {
routeName = route
}
xdsC.rdsCbs[routeName](update, err)
}
// WaitForCancelRouteConfigWatch waits for a RDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) (string, error) {
val, err := xdsC.rdsCancelCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}
// WatchEndpoints registers an EDS watch for provided clusterName.
func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsresource.EndpointsUpdate, error)) (cancel func()) {
xdsC.edsCbs[clusterName] = callback
xdsC.edsWatchCh.Send(clusterName)
return func() {
xdsC.edsCancelCh.Send(clusterName)
}
}
// WaitForWatchEDS waits for WatchEndpoints to be invoked on this client and
// returns the clusterName being watched.
func (xdsC *Client) WaitForWatchEDS(ctx context.Context) (string, error) {
val, err := xdsC.edsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}
// InvokeWatchEDSCallback invokes the registered edsWatch callback.
//
// Not thread safe with WatchEndpoints. Only call this after
// WaitForWatchEDS.
func (xdsC *Client) InvokeWatchEDSCallback(name string, update xdsresource.EndpointsUpdate, err error) {
if len(xdsC.edsCbs) != 1 {
// This may panic if name isn't found. But it's fine for tests.
xdsC.edsCbs[name](update, err)
return
}
// Keeps functionality with previous usage of this, if single callback call
// that callback.
for n := range xdsC.edsCbs {
name = n
}
xdsC.edsCbs[name](update, err)
}
// WaitForCancelEDSWatch waits for a EDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) (string, error) {
edsNameReceived, err := xdsC.edsCancelCh.Receive(ctx)
if err != nil {
return "", err
}
return edsNameReceived.(string), err
}
// ReportLoadArgs wraps the arguments passed to ReportLoad.
@ -247,20 +105,9 @@ func NewClient() *Client {
func NewClientWithName(name string) *Client {
return &Client{
name: name,
ldsWatchCh: testutils.NewChannelWithSize(10),
rdsWatchCh: testutils.NewChannelWithSize(10),
cdsWatchCh: testutils.NewChannelWithSize(10),
edsWatchCh: testutils.NewChannelWithSize(10),
ldsCancelCh: testutils.NewChannelWithSize(10),
rdsCancelCh: testutils.NewChannelWithSize(10),
cdsCancelCh: testutils.NewChannelWithSize(10),
edsCancelCh: testutils.NewChannelWithSize(10),
loadReportCh: testutils.NewChannel(),
lrsCancelCh: testutils.NewChannel(),
loadStore: load.NewStore(),
bootstrapCfg: &bootstrap.Config{ClientDefaultListenerResourceNameTemplate: "%s"},
rdsCbs: make(map[string]func(xdsresource.RouteConfigUpdate, error)),
cdsCbs: make(map[string]func(xdsresource.ClusterUpdate, error)),
edsCbs: make(map[string]func(xdsresource.EndpointsUpdate, error)),
}
}

View File

@ -24,7 +24,7 @@ import "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
// used to receive updates on watches registered with the xDS client, when using
// the resource-type agnostic WatchResource API.
//
// Tests can the channels provided by this tyep to get access to updates and
// Tests can use the channels provided by this type to get access to updates and
// errors sent by the xDS client.
type TestResourceWatcher struct {
// UpdateCh is the channel on which xDS client updates are delivered.

View File

@ -30,9 +30,6 @@ import (
// (collectively termed as xDS) on a remote management server, to discover
// various dynamic resources.
type XDSClient interface {
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
// WatchResource uses xDS to discover the resource associated with the
// provided resource name. The resource type implementation determines how
// xDS requests are sent out and how responses are deserialized and
@ -47,9 +44,6 @@ type XDSClient interface {
// During a race (e.g. an xDS response is received while the user is calling
// cancel()), there's a small window where the callback can be called after
// the watcher is canceled. Callers need to handle this case.
//
// TODO: Once this generic client API is fully implemented and integrated,
// delete the resource type specific watch APIs on this interface.
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
// DumpResources returns the status of the xDS resources. Returns a map of

View File

@ -25,62 +25,6 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
// This is only required temporarily, while we modify the
// clientImpl.WatchListener API to be implemented via the wrapper
// WatchListener() API which calls the WatchResource() API.
type listenerWatcher struct {
resourceName string
cb func(xdsresource.ListenerUpdate, error)
}
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
l.cb(update.Resource, nil)
}
func (l *listenerWatcher) OnError(err error) {
l.cb(xdsresource.ListenerUpdate{}, err)
}
func (l *listenerWatcher) OnResourceDoesNotExist() {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", l.resourceName)
l.cb(xdsresource.ListenerUpdate{}, err)
}
// WatchListener uses LDS to discover information about the Listener resource
// identified by resourceName.
func (c *clientImpl) WatchListener(resourceName string, cb func(xdsresource.ListenerUpdate, error)) (cancel func()) {
watcher := &listenerWatcher{resourceName: resourceName, cb: cb}
return xdsresource.WatchListener(c, resourceName, watcher)
}
// This is only required temporarily, while we modify the
// clientImpl.WatchRouteConfig API to be implemented via the wrapper
// WatchRouteConfig() API which calls the WatchResource() API.
type routeConfigWatcher struct {
resourceName string
cb func(xdsresource.RouteConfigUpdate, error)
}
func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
r.cb(update.Resource, nil)
}
func (r *routeConfigWatcher) OnError(err error) {
r.cb(xdsresource.RouteConfigUpdate{}, err)
}
func (r *routeConfigWatcher) OnResourceDoesNotExist() {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", r.resourceName)
r.cb(xdsresource.RouteConfigUpdate{}, err)
}
// WatchRouteConfig uses RDS to discover information about the
// RouteConfiguration resource identified by resourceName.
func (c *clientImpl) WatchRouteConfig(resourceName string, cb func(xdsresource.RouteConfigUpdate, error)) (cancel func()) {
watcher := &routeConfigWatcher{resourceName: resourceName, cb: cb}
return xdsresource.WatchRouteConfig(c, resourceName, watcher)
}
// WatchResource uses xDS to discover the resource associated with the provided
// resource name. The resource type implementation determines how xDS requests
// are sent out and how responses are deserialized and validated. Upon receipt

View File

@ -118,12 +118,6 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.
return lisDefault, lisNonDefault, client, close
}
type noopClusterWatcher struct{}
func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {}
func (noopClusterWatcher) OnError(err error) {}
func (noopClusterWatcher) OnResourceDoesNotExist() {}
// TestAuthorityShare tests the authority sharing logic. The test verifies the
// following scenarios:
// - A watch for a resource name with an authority matching an existing watch

View File

@ -41,6 +41,17 @@ import (
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
type noopClusterWatcher struct{}
func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {}
func (noopClusterWatcher) OnError(err error) {}
func (noopClusterWatcher) OnResourceDoesNotExist() {}
type clusterUpdateErrTuple struct {
update xdsresource.ClusterUpdate
err error
}
type clusterWatcher struct {
updateCh *testutils.Channel
}
@ -49,20 +60,20 @@ func newClusterWatcher() *clusterWatcher {
return &clusterWatcher{updateCh: testutils.NewChannel()}
}
func (ew *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {
ew.updateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: update.Resource})
func (cw *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {
cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource})
}
func (ew *clusterWatcher) OnError(err error) {
func (cw *clusterWatcher) OnError(err error) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
ew.updateCh.Replace(xdsresource.ClusterUpdateErrTuple{Err: err})
cw.updateCh.Replace(clusterUpdateErrTuple{err: err})
}
func (ew *clusterWatcher) OnResourceDoesNotExist() {
ew.updateCh.Replace(xdsresource.ClusterUpdateErrTuple{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")})
func (cw *clusterWatcher) OnResourceDoesNotExist() {
cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")})
}
// badClusterResource returns a cluster resource for the given name which
@ -83,19 +94,19 @@ const wantClusterNACKErr = "unsupported config_source_specifier"
//
// Returns an error if no update is received before the context deadline expires
// or the received update does not match the expected one.
func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.ClusterUpdateErrTuple) error {
func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate clusterUpdateErrTuple) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for a cluster resource from the management server: %v", err)
}
got := u.(xdsresource.ClusterUpdateErrTuple)
if wantUpdate.Err != nil {
if gotType, wantType := xdsresource.ErrType(got.Err), xdsresource.ErrType(wantUpdate.Err); gotType != wantType {
got := u.(clusterUpdateErrTuple)
if wantUpdate.err != nil {
if gotType, wantType := xdsresource.ErrType(got.err), xdsresource.ErrType(wantUpdate.err); gotType != wantType {
return fmt.Errorf("received update with error type %v, want %v", gotType, wantType)
}
}
cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy")}
if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" {
if diff := cmp.Diff(wantUpdate.update, got.update, cmpOpts...); diff != "" {
return fmt.Errorf("received unepected diff in the cluster resource update: (-want, got):\n%s", diff)
}
return nil
@ -133,7 +144,7 @@ func (s) TestCDSWatch(t *testing.T) {
watchedResource *v3clusterpb.Cluster // The resource being watched.
updatedWatchedResource *v3clusterpb.Cluster // The watched resource after an update.
notWatchedResource *v3clusterpb.Cluster // A resource which is not being watched.
wantUpdate xdsresource.ClusterUpdateErrTuple
wantUpdate clusterUpdateErrTuple
}{
{
desc: "old style resource",
@ -141,8 +152,8 @@ func (s) TestCDSWatch(t *testing.T) {
watchedResource: e2e.DefaultCluster(cdsName, edsName, e2e.SecurityLevelNone),
updatedWatchedResource: e2e.DefaultCluster(cdsName, "new-eds-resource", e2e.SecurityLevelNone),
notWatchedResource: e2e.DefaultCluster("unsubscribed-cds-resource", edsName, e2e.SecurityLevelNone),
wantUpdate: xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate: clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: cdsName,
EDSServiceName: edsName,
},
@ -154,8 +165,8 @@ func (s) TestCDSWatch(t *testing.T) {
watchedResource: e2e.DefaultCluster(cdsNameNewStyle, edsNameNewStyle, e2e.SecurityLevelNone),
updatedWatchedResource: e2e.DefaultCluster(cdsNameNewStyle, "new-eds-resource", e2e.SecurityLevelNone),
notWatchedResource: e2e.DefaultCluster("unsubscribed-cds-resource", edsNameNewStyle, e2e.SecurityLevelNone),
wantUpdate: xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate: clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: cdsNameNewStyle,
EDSServiceName: edsNameNewStyle,
},
@ -249,22 +260,22 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
resourceName string
watchedResource *v3clusterpb.Cluster // The resource being watched.
updatedWatchedResource *v3clusterpb.Cluster // The watched resource after an update.
wantUpdateV1 xdsresource.ClusterUpdateErrTuple
wantUpdateV2 xdsresource.ClusterUpdateErrTuple
wantUpdateV1 clusterUpdateErrTuple
wantUpdateV2 clusterUpdateErrTuple
}{
{
desc: "old style resource",
resourceName: cdsName,
watchedResource: e2e.DefaultCluster(cdsName, edsName, e2e.SecurityLevelNone),
updatedWatchedResource: e2e.DefaultCluster(cdsName, "new-eds-resource", e2e.SecurityLevelNone),
wantUpdateV1: xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdateV1: clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: cdsName,
EDSServiceName: edsName,
},
},
wantUpdateV2: xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdateV2: clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: cdsName,
EDSServiceName: "new-eds-resource",
},
@ -275,14 +286,14 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
resourceName: cdsNameNewStyle,
watchedResource: e2e.DefaultCluster(cdsNameNewStyle, edsNameNewStyle, e2e.SecurityLevelNone),
updatedWatchedResource: e2e.DefaultCluster(cdsNameNewStyle, "new-eds-resource", e2e.SecurityLevelNone),
wantUpdateV1: xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdateV1: clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: cdsNameNewStyle,
EDSServiceName: edsNameNewStyle,
},
},
wantUpdateV2: xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdateV2: clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: cdsNameNewStyle,
EDSServiceName: "new-eds-resource",
},
@ -414,14 +425,14 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
}
// Verify the contents of the received update for the all watchers.
wantUpdate12 := xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate12 := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: cdsName,
EDSServiceName: edsName,
},
}
wantUpdate3 := xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate3 := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: cdsNameNewStyle,
EDSServiceName: edsNameNewStyle,
},
@ -492,8 +503,8 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) {
}
// Verify the contents of the received update.
wantUpdate := xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: cdsName,
EDSServiceName: edsName,
},
@ -558,7 +569,7 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
if err := verifyClusterUpdate(ctx, cw.updateCh, xdsresource.ClusterUpdateErrTuple{Err: wantErr}); err != nil {
if err := verifyClusterUpdate(ctx, cw.updateCh, clusterUpdateErrTuple{err: wantErr}); err != nil {
t.Fatal(err)
}
}
@ -605,8 +616,8 @@ func (s) TestCDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
}
// Verify the contents of the received update.
wantUpdate := xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: cdsName,
EDSServiceName: edsName,
},
@ -673,14 +684,14 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) {
}
// Verify the contents of the received update for both watchers.
wantUpdate1 := xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate1 := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: resourceName1,
EDSServiceName: edsName,
},
}
wantUpdate2 := xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate2 := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: resourceName2,
EDSServiceName: edsNameNewStyle,
},
@ -704,7 +715,7 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) {
// The first watcher should receive a resource removed error, while the
// second watcher should not receive an update.
if err := verifyClusterUpdate(ctx, cw1.updateCh, xdsresource.ClusterUpdateErrTuple{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil {
if err := verifyClusterUpdate(ctx, cw1.updateCh, clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil {
t.Fatal(err)
}
if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil {
@ -724,8 +735,8 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) {
if err := verifyNoClusterUpdate(ctx, cw1.updateCh); err != nil {
t.Fatal(err)
}
wantUpdate := xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: resourceName2,
EDSServiceName: "new-eds-resource",
},
@ -773,7 +784,7 @@ func (s) TestCDSWatch_NACKError(t *testing.T) {
if err != nil {
t.Fatalf("timeout when waiting for a cluster resource from the management server: %v", err)
}
gotErr := u.(xdsresource.ClusterUpdateErrTuple).Err
gotErr := u.(clusterUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantClusterNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantClusterNACKErr)
}
@ -828,15 +839,15 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) {
if err != nil {
t.Fatalf("timeout when waiting for a cluster resource from the management server: %v", err)
}
gotErr := u.(xdsresource.ClusterUpdateErrTuple).Err
gotErr := u.(clusterUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantClusterNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantClusterNACKErr)
}
// Verify that the watcher watching the good resource receives a good
// update.
wantUpdate := xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: goodResourceName,
EDSServiceName: edsName,
},
@ -889,8 +900,8 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) {
}
// Verify the contents of the received update for first watcher.
wantUpdate1 := xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate1 := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: resourceName1,
EDSServiceName: edsName,
},
@ -919,8 +930,8 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) {
}
// Verify the contents of the received update for the second watcher.
wantUpdate2 := xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate2 := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: resourceName2,
EDSServiceName: edsNameNewStyle,
},

View File

@ -60,12 +60,6 @@ func compareDump(ctx context.Context, client xdsclient.XDSClient, want map[strin
}
}
type noopEndpointsWatcher struct{}
func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {}
func (noopEndpointsWatcher) OnError(err error) {}
func (noopEndpointsWatcher) OnResourceDoesNotExist() {}
func (s) TestDumpResources(t *testing.T) {
// Initialize the xDS resources to be used in this test.
ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"}
@ -119,10 +113,10 @@ func (s) TestDumpResources(t *testing.T) {
// Register watches, dump resources and expect configs in requested state.
for _, target := range ldsTargets {
client.WatchListener(target, func(xdsresource.ListenerUpdate, error) {})
xdsresource.WatchListener(client, target, noopListenerWatcher{})
}
for _, target := range rdsTargets {
client.WatchRouteConfig(target, func(xdsresource.RouteConfigUpdate, error) {})
xdsresource.WatchRouteConfig(client, target, noopRouteConfigWatcher{})
}
for _, target := range cdsTargets {
xdsresource.WatchCluster(client, target, noopClusterWatcher{})

View File

@ -52,6 +52,12 @@ const (
edsPort3 = 3
)
type noopEndpointsWatcher struct{}
func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {}
func (noopEndpointsWatcher) OnError(err error) {}
func (noopEndpointsWatcher) OnResourceDoesNotExist() {}
type endpointsUpdateErrTuple struct {
update xdsresource.EndpointsUpdate
err error

View File

@ -23,7 +23,6 @@ import (
"testing"
"github.com/google/uuid"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/bootstrap"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal"
@ -95,15 +94,11 @@ func (s) TestFederation_ListenerResourceContextParamOrder(t *testing.T) {
// Register two watches for listener resources with the same query string,
// but context parameters in different order.
updateCh1 := testutils.NewChannel()
ldsCancel1 := client.WatchListener(resourceName1, func(u xdsresource.ListenerUpdate, err error) {
updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw1 := newListenerWatcher()
ldsCancel1 := xdsresource.WatchListener(client, resourceName1, lw1)
defer ldsCancel1()
updateCh2 := testutils.NewChannel()
ldsCancel2 := client.WatchListener(resourceName2, func(u xdsresource.ListenerUpdate, err error) {
updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, resourceName2, lw2)
defer ldsCancel2()
// Configure the management server for the non-default authority to return a
@ -119,17 +114,17 @@ func (s) TestFederation_ListenerResourceContextParamOrder(t *testing.T) {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
wantUpdate := xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: "rds-resource",
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
// Verify the contents of the received update.
if err := verifyListenerUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}
@ -151,15 +146,11 @@ func (s) TestFederation_RouteConfigResourceContextParamOrder(t *testing.T) {
// Register two watches for route configuration resources with the same
// query string, but context parameters in different order.
updateCh1 := testutils.NewChannel()
rdsCancel1 := client.WatchRouteConfig(resourceName1, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw1 := newRouteConfigWatcher()
rdsCancel1 := xdsresource.WatchRouteConfig(client, resourceName1, rw1)
defer rdsCancel1()
updateCh2 := testutils.NewChannel()
rdsCancel2 := client.WatchRouteConfig(resourceName2, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw2 := newRouteConfigWatcher()
rdsCancel2 := xdsresource.WatchRouteConfig(client, resourceName2, rw2)
defer rdsCancel2()
// Configure the management server for the non-default authority to return a
@ -175,8 +166,8 @@ func (s) TestFederation_RouteConfigResourceContextParamOrder(t *testing.T) {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
wantUpdate := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdate := routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{"listener-resource"},
@ -192,10 +183,10 @@ func (s) TestFederation_RouteConfigResourceContextParamOrder(t *testing.T) {
},
}
// Verify the contents of the received update.
if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}
@ -237,8 +228,8 @@ func (s) TestFederation_ClusterResourceContextParamOrder(t *testing.T) {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
wantUpdate := xdsresource.ClusterUpdateErrTuple{
Update: xdsresource.ClusterUpdate{
wantUpdate := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: "xdstp://non-default-authority/envoy.config.cluster.v3.Cluster/xdsclient-test-cds-resource?a=1&b=2",
EDSServiceName: "eds-service-name",
},

View File

@ -72,6 +72,41 @@ const (
edsNameNewStyle = "xdstp:///envoy.config.endpoint.v3.ClusterLoadAssignment/xdsclient-test-eds-resource"
)
type noopListenerWatcher struct{}
func (noopListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {}
func (noopListenerWatcher) OnError(err error) {}
func (noopListenerWatcher) OnResourceDoesNotExist() {}
type listenerUpdateErrTuple struct {
update xdsresource.ListenerUpdate
err error
}
type listenerWatcher struct {
updateCh *testutils.Channel
}
func newListenerWatcher() *listenerWatcher {
return &listenerWatcher{updateCh: testutils.NewChannel()}
}
func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
}
func (cw *listenerWatcher) OnError(err error) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
cw.updateCh.Replace(listenerUpdateErrTuple{err: err})
}
func (cw *listenerWatcher) OnResourceDoesNotExist() {
cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
}
// badListenerResource returns a listener resource for the given name which does
// not contain the `RouteSpecifier` field in the HTTPConnectionManager, and
// hence is expected to be NACKed by the client.
@ -115,14 +150,14 @@ func verifyNoListenerUpdate(ctx context.Context, updateCh *testutils.Channel) er
//
// Returns an error if no update is received before the context deadline expires
// or the received update does not match the expected one.
func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.ListenerUpdateErrTuple) error {
func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate listenerUpdateErrTuple) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for a listener resource from the management server: %v", err)
}
got := u.(xdsresource.ListenerUpdateErrTuple)
if wantUpdate.Err != nil {
if gotType, wantType := xdsresource.ErrType(got.Err), xdsresource.ErrType(wantUpdate.Err); gotType != wantType {
got := u.(listenerUpdateErrTuple)
if wantUpdate.err != nil {
if gotType, wantType := xdsresource.ErrType(got.err), xdsresource.ErrType(wantUpdate.err); gotType != wantType {
return fmt.Errorf("received update with error type %v, want %v", gotType, wantType)
}
}
@ -131,7 +166,7 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want
cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"),
cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"),
}
if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" {
if diff := cmp.Diff(wantUpdate.update, got.update, cmpOpts...); diff != "" {
return fmt.Errorf("received unepected diff in the listener resource update: (-want, got):\n%s", diff)
}
return nil
@ -155,7 +190,7 @@ func (s) TestLDSWatch(t *testing.T) {
watchedResource *v3listenerpb.Listener // The resource being watched.
updatedWatchedResource *v3listenerpb.Listener // The watched resource after an update.
notWatchedResource *v3listenerpb.Listener // A resource which is not being watched.
wantUpdate xdsresource.ListenerUpdateErrTuple
wantUpdate listenerUpdateErrTuple
}{
{
desc: "old style resource",
@ -163,8 +198,8 @@ func (s) TestLDSWatch(t *testing.T) {
watchedResource: e2e.DefaultClientListener(ldsName, rdsName),
updatedWatchedResource: e2e.DefaultClientListener(ldsName, "new-rds-resource"),
notWatchedResource: e2e.DefaultClientListener("unsubscribed-lds-resource", rdsName),
wantUpdate: xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdate: listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
@ -176,8 +211,8 @@ func (s) TestLDSWatch(t *testing.T) {
watchedResource: e2e.DefaultClientListener(ldsNameNewStyle, rdsNameNewStyle),
updatedWatchedResource: e2e.DefaultClientListener(ldsNameNewStyle, "new-rds-resource"),
notWatchedResource: e2e.DefaultClientListener("unsubscribed-lds-resource", rdsNameNewStyle),
wantUpdate: xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdate: listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsNameNewStyle,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
@ -199,10 +234,8 @@ func (s) TestLDSWatch(t *testing.T) {
// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
updateCh := testutils.NewChannel()
ldsCancel := client.WatchListener(test.resourceName, func(u xdsresource.ListenerUpdate, err error) {
updateCh.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw := newListenerWatcher()
ldsCancel := xdsresource.WatchListener(client, test.resourceName, lw)
// Configure the management server to return a single listener
// resource, corresponding to the one we registered a watch for.
@ -218,7 +251,7 @@ func (s) TestLDSWatch(t *testing.T) {
}
// Verify the contents of the received update.
if err := verifyListenerUpdate(ctx, updateCh, test.wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw.updateCh, test.wantUpdate); err != nil {
t.Fatal(err)
}
@ -232,7 +265,7 @@ func (s) TestLDSWatch(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoListenerUpdate(ctx, updateCh); err != nil {
if err := verifyNoListenerUpdate(ctx, lw.updateCh); err != nil {
t.Fatal(err)
}
@ -247,7 +280,7 @@ func (s) TestLDSWatch(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoListenerUpdate(ctx, updateCh); err != nil {
if err := verifyNoListenerUpdate(ctx, lw.updateCh); err != nil {
t.Fatal(err)
}
})
@ -273,22 +306,22 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
resourceName string
watchedResource *v3listenerpb.Listener // The resource being watched.
updatedWatchedResource *v3listenerpb.Listener // The watched resource after an update.
wantUpdateV1 xdsresource.ListenerUpdateErrTuple
wantUpdateV2 xdsresource.ListenerUpdateErrTuple
wantUpdateV1 listenerUpdateErrTuple
wantUpdateV2 listenerUpdateErrTuple
}{
{
desc: "old style resource",
resourceName: ldsName,
watchedResource: e2e.DefaultClientListener(ldsName, rdsName),
updatedWatchedResource: e2e.DefaultClientListener(ldsName, "new-rds-resource"),
wantUpdateV1: xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdateV1: listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
},
wantUpdateV2: xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdateV2: listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: "new-rds-resource",
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
@ -299,14 +332,14 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
resourceName: ldsNameNewStyle,
watchedResource: e2e.DefaultClientListener(ldsNameNewStyle, rdsNameNewStyle),
updatedWatchedResource: e2e.DefaultClientListener(ldsNameNewStyle, "new-rds-resource"),
wantUpdateV1: xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdateV1: listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsNameNewStyle,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
},
wantUpdateV2: xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdateV2: listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: "new-rds-resource",
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
@ -328,15 +361,11 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
// Register two watches for the same listener resource and have the
// callbacks push the received updates on to a channel.
updateCh1 := testutils.NewChannel()
ldsCancel1 := client.WatchListener(test.resourceName, func(u xdsresource.ListenerUpdate, err error) {
updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw1 := newListenerWatcher()
ldsCancel1 := xdsresource.WatchListener(client, test.resourceName, lw1)
defer ldsCancel1()
updateCh2 := testutils.NewChannel()
ldsCancel2 := client.WatchListener(test.resourceName, func(u xdsresource.ListenerUpdate, err error) {
updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, test.resourceName, lw2)
// Configure the management server to return a single listener
// resource, corresponding to the one we registered watches for.
@ -352,10 +381,10 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
}
// Verify the contents of the received update.
if err := verifyListenerUpdate(ctx, updateCh1, test.wantUpdateV1); err != nil {
if err := verifyListenerUpdate(ctx, lw1.updateCh, test.wantUpdateV1); err != nil {
t.Fatal(err)
}
if err := verifyListenerUpdate(ctx, updateCh2, test.wantUpdateV1); err != nil {
if err := verifyListenerUpdate(ctx, lw2.updateCh, test.wantUpdateV1); err != nil {
t.Fatal(err)
}
@ -366,10 +395,10 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoListenerUpdate(ctx, updateCh1); err != nil {
if err := verifyNoListenerUpdate(ctx, lw1.updateCh); err != nil {
t.Fatal(err)
}
if err := verifyNoListenerUpdate(ctx, updateCh2); err != nil {
if err := verifyNoListenerUpdate(ctx, lw2.updateCh); err != nil {
t.Fatal(err)
}
@ -383,10 +412,10 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyListenerUpdate(ctx, updateCh1, test.wantUpdateV2); err != nil {
if err := verifyListenerUpdate(ctx, lw1.updateCh, test.wantUpdateV2); err != nil {
t.Fatal(err)
}
if err := verifyNoListenerUpdate(ctx, updateCh2); err != nil {
if err := verifyNoListenerUpdate(ctx, lw2.updateCh); err != nil {
t.Fatal(err)
}
})
@ -413,22 +442,16 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
// Register two watches for the same listener resource and have the
// callbacks push the received updates on to a channel.
updateCh1 := testutils.NewChannel()
ldsCancel1 := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) {
updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw1 := newListenerWatcher()
ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1)
defer ldsCancel1()
updateCh2 := testutils.NewChannel()
ldsCancel2 := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) {
updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
defer ldsCancel2()
// Register the third watch for a different listener resource.
updateCh3 := testutils.NewChannel()
ldsCancel3 := client.WatchListener(ldsNameNewStyle, func(u xdsresource.ListenerUpdate, err error) {
updateCh3.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw3 := newListenerWatcher()
ldsCancel3 := xdsresource.WatchListener(client, ldsNameNewStyle, lw3)
defer ldsCancel3()
// Configure the management server to return two listener resources,
@ -450,19 +473,19 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
// Verify the contents of the received update for the all watchers. The two
// resources returned differ only in the resource name. Therefore the
// expected update is the same for all the watchers.
wantUpdate := xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
if err := verifyListenerUpdate(ctx, updateCh3, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw3.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}
@ -504,10 +527,8 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) {
// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
updateCh1 := testutils.NewChannel()
ldsCancel1 := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) {
updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw1 := newListenerWatcher()
ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1)
defer ldsCancel1()
// Configure the management server to return a single listener
@ -524,13 +545,13 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) {
}
// Verify the contents of the received update.
wantUpdate := xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
select {
@ -541,12 +562,10 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) {
// Register another watch for the same resource. This should get the update
// from the cache.
updateCh2 := testutils.NewChannel()
ldsCancel2 := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) {
updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
defer ldsCancel2()
if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
// No request should get sent out as part of this watch.
@ -581,10 +600,8 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// Register a watch for a resource which is expected to fail with an error
// after the watch expiry timer fires.
updateCh := testutils.NewChannel()
ldsCancel := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) {
updateCh.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw := newListenerWatcher()
ldsCancel := xdsresource.WatchListener(client, ldsName, lw)
defer ldsCancel()
// Wait for the watch expiry timer to fire.
@ -594,7 +611,7 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
if err := verifyListenerUpdate(ctx, updateCh, xdsresource.ListenerUpdateErrTuple{Err: wantErr}); err != nil {
if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantErr}); err != nil {
t.Fatal(err)
}
}
@ -623,10 +640,8 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
updateCh := testutils.NewChannel()
ldsCancel := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) {
updateCh.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw := newListenerWatcher()
ldsCancel := xdsresource.WatchListener(client, ldsName, lw)
defer ldsCancel()
// Configure the management server to return a single listener
@ -643,20 +658,20 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
}
// Verify the contents of the received update.
wantUpdate := xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, updateCh, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
// Wait for the watch expiry timer to fire, and verify that the callback is
// not invoked.
<-time.After(defaultTestWatchExpiryTimeout)
if err := verifyNoListenerUpdate(ctx, updateCh); err != nil {
if err := verifyNoListenerUpdate(ctx, lw.updateCh); err != nil {
t.Fatal(err)
}
}
@ -686,17 +701,13 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
// Register two watches for two listener resources and have the
// callbacks push the received updates on to a channel.
resourceName1 := ldsName
updateCh1 := testutils.NewChannel()
ldsCancel1 := client.WatchListener(resourceName1, func(u xdsresource.ListenerUpdate, err error) {
updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw1 := newListenerWatcher()
ldsCancel1 := xdsresource.WatchListener(client, resourceName1, lw1)
defer ldsCancel1()
resourceName2 := ldsNameNewStyle
updateCh2 := testutils.NewChannel()
ldsCancel2 := client.WatchListener(resourceName2, func(u xdsresource.ListenerUpdate, err error) {
updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, resourceName2, lw2)
defer ldsCancel2()
// Configure the management server to return two listener resources,
@ -718,16 +729,16 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
// Verify the contents of the received update for both watchers. The two
// resources returned differ only in the resource name. Therefore the
// expected update is the same for both watchers.
wantUpdate := xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
@ -743,12 +754,12 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
// The first watcher should receive a resource removed error, while the
// second watcher should not see an update.
if err := verifyListenerUpdate(ctx, updateCh1, xdsresource.ListenerUpdateErrTuple{
Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, ""),
if err := verifyListenerUpdate(ctx, lw1.updateCh, listenerUpdateErrTuple{
err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, ""),
}); err != nil {
t.Fatal(err)
}
if err := verifyNoListenerUpdate(ctx, updateCh2); err != nil {
if err := verifyNoListenerUpdate(ctx, lw2.updateCh); err != nil {
t.Fatal(err)
}
@ -762,16 +773,16 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoListenerUpdate(ctx, updateCh1); err != nil {
if err := verifyNoListenerUpdate(ctx, lw1.updateCh); err != nil {
t.Fatal(err)
}
wantUpdate = xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdate = listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: "new-rds-resource",
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}
@ -792,12 +803,8 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
updateCh := testutils.NewChannel()
ldsCancel := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) {
updateCh.SendContext(ctx, xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw := newListenerWatcher()
ldsCancel := xdsresource.WatchListener(client, ldsName, lw)
defer ldsCancel()
// Configure the management server to return a single listener resource
@ -807,16 +814,18 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
Listeners: []*v3listenerpb.Listener{badListenerResource(t, ldsName)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Verify that the expected error is propagated to the watcher.
u, err := updateCh.Receive(ctx)
u, err := lw.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
}
gotErr := u.(xdsresource.ListenerUpdateErrTuple).Err
gotErr := u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
}
@ -844,16 +853,12 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
badResourceName := ldsName
updateCh1 := testutils.NewChannel()
ldsCancel1 := client.WatchListener(badResourceName, func(u xdsresource.ListenerUpdate, err error) {
updateCh1.SendContext(ctx, xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw1 := newListenerWatcher()
ldsCancel1 := xdsresource.WatchListener(client, badResourceName, lw1)
defer ldsCancel1()
goodResourceName := ldsNameNewStyle
updateCh2 := testutils.NewChannel()
ldsCancel2 := client.WatchListener(goodResourceName, func(u xdsresource.ListenerUpdate, err error) {
updateCh2.SendContext(ctx, xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, goodResourceName, lw2)
defer ldsCancel2()
// Configure the management server with two listener resources. One of these
@ -872,24 +877,24 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {
// Verify that the expected error is propagated to the watcher which
// requested for the bad resource.
u, err := updateCh1.Receive(ctx)
u, err := lw1.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
}
gotErr := u.(xdsresource.ListenerUpdateErrTuple).Err
gotErr := u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
}
// Verify that the watcher watching the good resource receives a good
// update.
wantUpdate := xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}
@ -915,17 +920,13 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) {
// Register two watches for two listener resources and have the
// callbacks push the received updates on to a channel.
resourceName1 := ldsName
updateCh1 := testutils.NewChannel()
ldsCancel1 := client.WatchListener(resourceName1, func(u xdsresource.ListenerUpdate, err error) {
updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw1 := newListenerWatcher()
ldsCancel1 := xdsresource.WatchListener(client, resourceName1, lw1)
defer ldsCancel1()
resourceName2 := ldsNameNewStyle
updateCh2 := testutils.NewChannel()
ldsCancel2 := client.WatchListener(resourceName2, func(u xdsresource.ListenerUpdate, err error) {
updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, resourceName2, lw2)
defer ldsCancel2()
// Configure the management server to return only one of the two listener
@ -944,18 +945,18 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) {
}
// Verify the contents of the received update for first watcher.
wantUpdate1 := xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdate1 := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, updateCh1, wantUpdate1); err != nil {
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate1); err != nil {
t.Fatal(err)
}
// Verify that the second watcher does not get an update with an error.
if err := verifyNoListenerUpdate(ctx, updateCh2); err != nil {
if err := verifyNoListenerUpdate(ctx, lw2.updateCh); err != nil {
t.Fatal(err)
}
@ -974,19 +975,19 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) {
}
// Verify the contents of the received update for the second watcher.
wantUpdate2 := xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
wantUpdate2 := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate2); err != nil {
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate2); err != nil {
t.Fatal(err)
}
// Verify that the first watcher gets no update, as the first resource did
// not change.
if err := verifyNoListenerUpdate(ctx, updateCh1); err != nil {
if err := verifyNoListenerUpdate(ctx, lw1.updateCh); err != nil {
t.Fatal(err)
}
}

View File

@ -47,6 +47,51 @@ var (
routeConfigResourceType = internal.ResourceTypeMapForTesting[version.V3RouteConfigURL].(xdsresource.Type)
)
// This route configuration watcher registers two watches corresponding to the
// names passed in at creation time on a valid update.
type testRouteConfigWatcher struct {
client xdsclient.XDSClient
name1, name2 string
rcw1, rcw2 *routeConfigWatcher
cancel1, cancel2 func()
updateCh *testutils.Channel
}
func newTestRouteConfigWatcher(client xdsclient.XDSClient, name1, name2 string) *testRouteConfigWatcher {
return &testRouteConfigWatcher{
client: client,
name1: name1,
name2: name2,
rcw1: newRouteConfigWatcher(),
rcw2: newRouteConfigWatcher(),
updateCh: testutils.NewChannel(),
}
}
func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource})
rw.cancel1 = xdsresource.WatchRouteConfig(rw.client, rw.name1, rw.rcw1)
rw.cancel2 = xdsresource.WatchRouteConfig(rw.client, rw.name2, rw.rcw2)
}
func (rw *testRouteConfigWatcher) OnError(err error) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err})
}
func (rw *testRouteConfigWatcher) OnResourceDoesNotExist() {
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
}
func (rw *testRouteConfigWatcher) cancel() {
rw.cancel1()
rw.cancel2()
}
// TestWatchCallAnotherWatch tests the scenario where a watch is registered for
// a resource, and more watches are registered from the first watch's callback.
// The test verifies that this scenario does not lead to a deadlock.
@ -78,32 +123,20 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Start a watch for one route configuration resource. From the watch
// callback of the first resource, register two more watches (one for the
// same resource name, which would be satisfied from the cache, and another
// for a different resource name, which would be satisfied from the server).
updateCh1 := testutils.NewChannel()
updateCh2 := testutils.NewChannel()
updateCh3 := testutils.NewChannel()
rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
// Watch for the same resource name.
rdsCancel2 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
t.Cleanup(rdsCancel2)
// Watch for a different resource name.
rdsCancel3 := client.WatchRouteConfig(rdsNameNewStyle, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh3.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
t.Cleanup(rdsCancel3)
})
t.Cleanup(rdsCancel1)
// Create a route configuration watcher that registers two more watches from
// the OnUpdate callback:
// - one for the same resource name as this watch, which would be
// satisfied from xdsClient cache
// - the other for a different resource name, which would be
// satisfied from the server
rw := newTestRouteConfigWatcher(client, rdsName, rdsNameNewStyle)
defer rw.cancel()
rdsCancel := xdsresource.WatchRouteConfig(client, rdsName, rw)
defer rdsCancel()
// Verify the contents of the received update for the all watchers.
wantUpdate12 := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdate12 := routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsName},
@ -118,8 +151,8 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) {
},
},
}
wantUpdate3 := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdate3 := routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsNameNewStyle},
@ -134,13 +167,13 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) {
},
},
}
if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate12); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw.updateCh, wantUpdate12); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate12); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw.rcw1.updateCh, wantUpdate12); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh3, wantUpdate3); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw.rcw2.updateCh, wantUpdate3); err != nil {
t.Fatal(err)
}
}

View File

@ -42,6 +42,41 @@ import (
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
type noopRouteConfigWatcher struct{}
func (noopRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {}
func (noopRouteConfigWatcher) OnError(err error) {}
func (noopRouteConfigWatcher) OnResourceDoesNotExist() {}
type routeConfigUpdateErrTuple struct {
update xdsresource.RouteConfigUpdate
err error
}
type routeConfigWatcher struct {
updateCh *testutils.Channel
}
func newRouteConfigWatcher() *routeConfigWatcher {
return &routeConfigWatcher{updateCh: testutils.NewChannel()}
}
func (rw *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource})
}
func (rw *routeConfigWatcher) OnError(err error) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err})
}
func (rw *routeConfigWatcher) OnResourceDoesNotExist() {
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
}
// badRouteConfigResource returns a RouteConfiguration resource for the given
// routeName which contains a retry config with num_retries set to `0`. This is
// expected to be NACK'ed by the xDS client.
@ -72,19 +107,19 @@ const wantRouteConfigNACKErr = "received route is invalid: retry_policy.num_retr
//
// Returns an error if no update is received before the context deadline expires
// or the received update does not match the expected one.
func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.RouteConfigUpdateErrTuple) error {
func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate routeConfigUpdateErrTuple) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for a route configuration resource from the management server: %v", err)
}
got := u.(xdsresource.RouteConfigUpdateErrTuple)
if wantUpdate.Err != nil {
if gotType, wantType := xdsresource.ErrType(got.Err), xdsresource.ErrType(wantUpdate.Err); gotType != wantType {
got := u.(routeConfigUpdateErrTuple)
if wantUpdate.err != nil {
if gotType, wantType := xdsresource.ErrType(got.err), xdsresource.ErrType(wantUpdate.err); gotType != wantType {
return fmt.Errorf("received update with error type %v, want %v", gotType, wantType)
}
}
cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw")}
if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" {
if diff := cmp.Diff(wantUpdate.update, got.update, cmpOpts...); diff != "" {
return fmt.Errorf("received unepected diff in the route configuration resource update: (-want, got):\n%s", diff)
}
return nil
@ -123,7 +158,7 @@ func (s) TestRDSWatch(t *testing.T) {
watchedResource *v3routepb.RouteConfiguration // The resource being watched.
updatedWatchedResource *v3routepb.RouteConfiguration // The watched resource after an update.
notWatchedResource *v3routepb.RouteConfiguration // A resource which is not being watched.
wantUpdate xdsresource.RouteConfigUpdateErrTuple
wantUpdate routeConfigUpdateErrTuple
}{
{
desc: "old style resource",
@ -131,8 +166,8 @@ func (s) TestRDSWatch(t *testing.T) {
watchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, cdsName),
updatedWatchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, "new-cds-resource"),
notWatchedResource: e2e.DefaultRouteConfig("unsubscribed-rds-resource", ldsName, cdsName),
wantUpdate: xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdate: routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsName},
@ -154,8 +189,8 @@ func (s) TestRDSWatch(t *testing.T) {
watchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, cdsNameNewStyle),
updatedWatchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, "new-cds-resource"),
notWatchedResource: e2e.DefaultRouteConfig("unsubscribed-rds-resource", ldsNameNewStyle, cdsNameNewStyle),
wantUpdate: xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdate: routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsNameNewStyle},
@ -187,10 +222,8 @@ func (s) TestRDSWatch(t *testing.T) {
// Register a watch for a route configuration resource and have the
// watch callback push the received update on to a channel.
updateCh := testutils.NewChannel()
rdsCancel := client.WatchRouteConfig(test.resourceName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw := newRouteConfigWatcher()
rdsCancel := xdsresource.WatchRouteConfig(client, test.resourceName, rw)
// Configure the management server to return a single route
// configuration resource, corresponding to the one being watched.
@ -206,7 +239,7 @@ func (s) TestRDSWatch(t *testing.T) {
}
// Verify the contents of the received update.
if err := verifyRouteConfigUpdate(ctx, updateCh, test.wantUpdate); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw.updateCh, test.wantUpdate); err != nil {
t.Fatal(err)
}
@ -220,7 +253,7 @@ func (s) TestRDSWatch(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoRouteConfigUpdate(ctx, updateCh); err != nil {
if err := verifyNoRouteConfigUpdate(ctx, rw.updateCh); err != nil {
t.Fatal(err)
}
@ -235,7 +268,7 @@ func (s) TestRDSWatch(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoRouteConfigUpdate(ctx, updateCh); err != nil {
if err := verifyNoRouteConfigUpdate(ctx, rw.updateCh); err != nil {
t.Fatal(err)
}
})
@ -261,16 +294,16 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
resourceName string
watchedResource *v3routepb.RouteConfiguration // The resource being watched.
updatedWatchedResource *v3routepb.RouteConfiguration // The watched resource after an update.
wantUpdateV1 xdsresource.RouteConfigUpdateErrTuple
wantUpdateV2 xdsresource.RouteConfigUpdateErrTuple
wantUpdateV1 routeConfigUpdateErrTuple
wantUpdateV2 routeConfigUpdateErrTuple
}{
{
desc: "old style resource",
resourceName: rdsName,
watchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, cdsName),
updatedWatchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, "new-cds-resource"),
wantUpdateV1: xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdateV1: routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsName},
@ -285,8 +318,8 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
},
},
},
wantUpdateV2: xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdateV2: routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsName},
@ -307,8 +340,8 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
resourceName: rdsNameNewStyle,
watchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, cdsNameNewStyle),
updatedWatchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, "new-cds-resource"),
wantUpdateV1: xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdateV1: routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsNameNewStyle},
@ -323,8 +356,8 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
},
},
},
wantUpdateV2: xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdateV2: routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsNameNewStyle},
@ -356,15 +389,11 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
// Register two watches for the same route configuration resource
// and have the callbacks push the received updates on to a channel.
updateCh1 := testutils.NewChannel()
rdsCancel1 := client.WatchRouteConfig(test.resourceName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw1 := newRouteConfigWatcher()
rdsCancel1 := xdsresource.WatchRouteConfig(client, test.resourceName, rw1)
defer rdsCancel1()
updateCh2 := testutils.NewChannel()
rdsCancel2 := client.WatchRouteConfig(test.resourceName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw2 := newRouteConfigWatcher()
rdsCancel2 := xdsresource.WatchRouteConfig(client, test.resourceName, rw2)
// Configure the management server to return a single route
// configuration resource, corresponding to the one being watched.
@ -380,10 +409,10 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
}
// Verify the contents of the received update.
if err := verifyRouteConfigUpdate(ctx, updateCh1, test.wantUpdateV1); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw1.updateCh, test.wantUpdateV1); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh2, test.wantUpdateV1); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw2.updateCh, test.wantUpdateV1); err != nil {
t.Fatal(err)
}
@ -394,10 +423,10 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoRouteConfigUpdate(ctx, updateCh1); err != nil {
if err := verifyNoRouteConfigUpdate(ctx, rw1.updateCh); err != nil {
t.Fatal(err)
}
if err := verifyNoRouteConfigUpdate(ctx, updateCh2); err != nil {
if err := verifyNoRouteConfigUpdate(ctx, rw2.updateCh); err != nil {
t.Fatal(err)
}
@ -411,10 +440,10 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh1, test.wantUpdateV2); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw1.updateCh, test.wantUpdateV2); err != nil {
t.Fatal(err)
}
if err := verifyNoRouteConfigUpdate(ctx, updateCh2); err != nil {
if err := verifyNoRouteConfigUpdate(ctx, rw2.updateCh); err != nil {
t.Fatal(err)
}
})
@ -441,22 +470,16 @@ func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
// Register two watches for the same route configuration resource
// and have the callbacks push the received updates on to a channel.
updateCh1 := testutils.NewChannel()
rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw1 := newRouteConfigWatcher()
rdsCancel1 := xdsresource.WatchRouteConfig(client, rdsName, rw1)
defer rdsCancel1()
updateCh2 := testutils.NewChannel()
rdsCancel2 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw2 := newRouteConfigWatcher()
rdsCancel2 := xdsresource.WatchRouteConfig(client, rdsName, rw2)
defer rdsCancel2()
// Register the third watch for a different route configuration resource.
updateCh3 := testutils.NewChannel()
rdsCancel3 := client.WatchRouteConfig(rdsNameNewStyle, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh3.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw3 := newRouteConfigWatcher()
rdsCancel3 := xdsresource.WatchRouteConfig(client, rdsNameNewStyle, rw3)
defer rdsCancel3()
// Configure the management server to return two route configuration
@ -478,8 +501,8 @@ func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
// Verify the contents of the received update for the all watchers. The two
// resources returned differ only in the resource name. Therefore the
// expected update is the same for all the watchers.
wantUpdate := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdate := routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsName},
@ -494,13 +517,13 @@ func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
},
},
}
if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh3, wantUpdate); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw3.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}
@ -542,10 +565,8 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) {
// Register a watch for a route configuration resource and have the watch
// callback push the received update on to a channel.
updateCh1 := testutils.NewChannel()
rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw1 := newRouteConfigWatcher()
rdsCancel1 := xdsresource.WatchRouteConfig(client, rdsName, rw1)
defer rdsCancel1()
// Configure the management server to return a single route configuration
@ -562,8 +583,8 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) {
}
// Verify the contents of the received update.
wantUpdate := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdate := routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsName},
@ -578,7 +599,7 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) {
},
},
}
if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
select {
@ -589,12 +610,10 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) {
// Register another watch for the same resource. This should get the update
// from the cache.
updateCh2 := testutils.NewChannel()
rdsCancel2 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw2 := newRouteConfigWatcher()
rdsCancel2 := xdsresource.WatchRouteConfig(client, rdsName, rw2)
defer rdsCancel2()
if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
// No request should get sent out as part of this watch.
@ -630,10 +649,8 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// Register a watch for a resource which is expected to fail with an error
// after the watch expiry timer fires.
updateCh := testutils.NewChannel()
rdsCancel := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw := newRouteConfigWatcher()
rdsCancel := xdsresource.WatchRouteConfig(client, rdsName, rw)
defer rdsCancel()
// Wait for the watch expiry timer to fire.
@ -643,7 +660,7 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
if err := verifyRouteConfigUpdate(ctx, updateCh, xdsresource.RouteConfigUpdateErrTuple{Err: wantErr}); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw.updateCh, routeConfigUpdateErrTuple{err: wantErr}); err != nil {
t.Fatal(err)
}
}
@ -672,10 +689,8 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
// Register a watch for a route configuration resource and have the watch
// callback push the received update on to a channel.
updateCh := testutils.NewChannel()
rdsCancel := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw := newRouteConfigWatcher()
rdsCancel := xdsresource.WatchRouteConfig(client, rdsName, rw)
defer rdsCancel()
// Configure the management server to return a single route configuration
@ -692,8 +707,8 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
}
// Verify the contents of the received update.
wantUpdate := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdate := routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsName},
@ -708,14 +723,14 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
},
},
}
if err := verifyRouteConfigUpdate(ctx, updateCh, wantUpdate); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
// Wait for the watch expiry timer to fire, and verify that the callback is
// not invoked.
<-time.After(defaultTestWatchExpiryTimeout)
if err := verifyNoRouteConfigUpdate(ctx, updateCh); err != nil {
if err := verifyNoRouteConfigUpdate(ctx, rw.updateCh); err != nil {
t.Fatal(err)
}
}
@ -736,12 +751,8 @@ func (s) TestRDSWatch_NACKError(t *testing.T) {
// Register a watch for a route configuration resource and have the watch
// callback push the received update on to a channel.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
updateCh := testutils.NewChannel()
rdsCancel := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh.SendContext(ctx, xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw := newRouteConfigWatcher()
rdsCancel := xdsresource.WatchRouteConfig(client, rdsName, rw)
defer rdsCancel()
// Configure the management server to return a single route configuration
@ -751,16 +762,18 @@ func (s) TestRDSWatch_NACKError(t *testing.T) {
Routes: []*v3routepb.RouteConfiguration{badRouteConfigResource(rdsName, ldsName, cdsName)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Verify that the expected error is propagated to the watcher.
u, err := updateCh.Receive(ctx)
u, err := rw.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a route configuration resource from the management server: %v", err)
}
gotErr := u.(xdsresource.RouteConfigUpdateErrTuple).Err
gotErr := u.(routeConfigUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantRouteConfigNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantRouteConfigNACKErr)
}
@ -788,16 +801,12 @@ func (s) TestRDSWatch_PartialValid(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
badResourceName := rdsName
updateCh1 := testutils.NewChannel()
rdsCancel1 := client.WatchRouteConfig(badResourceName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh1.SendContext(ctx, xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw1 := newRouteConfigWatcher()
rdsCancel1 := xdsresource.WatchRouteConfig(client, badResourceName, rw1)
defer rdsCancel1()
goodResourceName := rdsNameNewStyle
updateCh2 := testutils.NewChannel()
rdsCancel2 := client.WatchRouteConfig(goodResourceName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh2.SendContext(ctx, xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
rw2 := newRouteConfigWatcher()
rdsCancel2 := xdsresource.WatchRouteConfig(client, goodResourceName, rw2)
defer rdsCancel2()
// Configure the management server to return two route configuration
@ -816,19 +825,19 @@ func (s) TestRDSWatch_PartialValid(t *testing.T) {
// Verify that the expected error is propagated to the watcher which
// requested for the bad resource.
u, err := updateCh1.Receive(ctx)
u, err := rw1.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a route configuration resource from the management server: %v", err)
}
gotErr := u.(xdsresource.RouteConfigUpdateErrTuple).Err
gotErr := u.(routeConfigUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantRouteConfigNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantRouteConfigNACKErr)
}
// Verify that the watcher watching the good resource receives a good
// update.
wantUpdate := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
wantUpdate := routeConfigUpdateErrTuple{
update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsName},
@ -843,7 +852,7 @@ func (s) TestRDSWatch_PartialValid(t *testing.T) {
},
},
}
if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyRouteConfigUpdate(ctx, rw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}

View File

@ -251,18 +251,10 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// A wrapper struct to wrap the update and the associated error, as
// received by the resource watch callback.
type updateAndErr struct {
update xdsresource.ListenerUpdate
err error
}
updateAndErrCh := testutils.NewChannel()
// Register a watch, and push the results on to a channel.
client.WatchListener(test.resourceName, func(update xdsresource.ListenerUpdate, err error) {
updateAndErrCh.Send(updateAndErr{update: update, err: err})
})
lw := newListenerWatcher()
cancel := xdsresource.WatchListener(client, test.resourceName, lw)
defer cancel()
t.Logf("Registered a watch for Listener %q", test.resourceName)
// Wait for the discovery request to be sent out.
@ -288,12 +280,12 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
// Wait for an update from the xDS client and compare with expected
// update.
val, err = updateAndErrCh.Receive(ctx)
val, err = lw.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout when waiting for watch callback to invoked after response from management server: %v", err)
}
gotUpdate := val.(updateAndErr).update
gotErr := val.(updateAndErr).err
gotUpdate := val.(listenerUpdateErrTuple).update
gotErr := val.(listenerUpdateErrTuple).err
if (gotErr != nil) != (test.wantErr != "") {
t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr)
}
@ -513,18 +505,10 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) {
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// A wrapper struct to wrap the update and the associated error, as
// received by the resource watch callback.
type updateAndErr struct {
update xdsresource.RouteConfigUpdate
err error
}
updateAndErrCh := testutils.NewChannel()
// Register a watch, and push the results on to a channel.
client.WatchRouteConfig(test.resourceName, func(update xdsresource.RouteConfigUpdate, err error) {
updateAndErrCh.Send(updateAndErr{update: update, err: err})
})
rw := newRouteConfigWatcher()
cancel := xdsresource.WatchRouteConfig(client, test.resourceName, rw)
defer cancel()
t.Logf("Registered a watch for Route Configuration %q", test.resourceName)
// Wait for the discovery request to be sent out.
@ -550,12 +534,12 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) {
// Wait for an update from the xDS client and compare with expected
// update.
val, err = updateAndErrCh.Receive(ctx)
val, err = rw.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout when waiting for watch callback to invoked after response from management server: %v", err)
}
gotUpdate := val.(updateAndErr).update
gotErr := val.(updateAndErr).err
gotUpdate := val.(routeConfigUpdateErrTuple).update
gotErr := val.(routeConfigUpdateErrTuple).err
if (gotErr != nil) != (test.wantErr != "") {
t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr)
}
@ -740,7 +724,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
// Register a watch, and push the results on to a channel.
cw := newClusterWatcher()
xdsresource.WatchCluster(client, test.resourceName, cw)
cancel := xdsresource.WatchCluster(client, test.resourceName, cw)
defer cancel()
t.Logf("Registered a watch for Cluster %q", test.resourceName)
// Wait for the discovery request to be sent out.
@ -770,8 +755,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
if err != nil {
t.Fatalf("Timeout when waiting for watch callback to invoked after response from management server: %v", err)
}
gotUpdate := val.(xdsresource.ClusterUpdateErrTuple).Update
gotErr := val.(xdsresource.ClusterUpdateErrTuple).Err
gotUpdate := val.(clusterUpdateErrTuple).update
gotErr := val.(clusterUpdateErrTuple).err
if (gotErr != nil) != (test.wantErr != "") {
t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr)
}
@ -1052,8 +1037,8 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) {
// Register a watch, and push the results on to a channel.
ew := newEndpointsWatcher()
edsCancel := xdsresource.WatchEndpoints(client, test.resourceName, ew)
defer edsCancel()
cancel := xdsresource.WatchEndpoints(client, test.resourceName, ew)
defer cancel()
t.Logf("Registered a watch for Endpoint %q", test.resourceName)
// Wait for the discovery request to be sent out.

View File

@ -86,11 +86,3 @@ type ClusterUpdate struct {
// Raw is the resource from the xds response.
Raw *anypb.Any
}
// ClusterUpdateErrTuple is a tuple with the update and error. It contains the
// results from unmarshal functions. It's used to pass unmarshal results of
// multiple resources together, e.g. in maps like `map[string]{Update,error}`.
type ClusterUpdateErrTuple struct {
Update ClusterUpdate
Err error
}

View File

@ -77,11 +77,3 @@ type InboundListenerConfig struct {
// FilterChains is the list of filter chains associated with this listener.
FilterChains *FilterChainManager
}
// ListenerUpdateErrTuple is a tuple with the update and error. It contains the
// results from unmarshal functions. It's used to pass unmarshal results of
// multiple resources together, e.g. in maps like `map[string]{Update,error}`.
type ListenerUpdateErrTuple struct {
Update ListenerUpdate
Err error
}

View File

@ -246,11 +246,3 @@ func (sc *SecurityConfig) Equal(other *SecurityConfig) bool {
}
return true
}
// RouteConfigUpdateErrTuple is a tuple with the update and error. It contains
// the results from unmarshal functions. It's used to pass unmarshal results of
// multiple resources together, e.g. in maps like `map[string]{Update,error}`.
type RouteConfigUpdateErrTuple struct {
Update RouteConfigUpdate
Err error
}