xdsclient: completely remove the old WatchCluster API (#6621)

This commit is contained in:
Easwar Swaminathan 2023-09-18 09:00:19 -07:00 committed by GitHub
parent 94d8074c61
commit 92f5ba9783
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 129 additions and 221 deletions

View File

@ -138,57 +138,6 @@ func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) (string,
return val.(string), err
}
// WatchCluster registers a CDS watch.
func (xdsC *Client) WatchCluster(clusterName string, callback func(xdsresource.ClusterUpdate, error)) func() {
// Due to the tree like structure of aggregate clusters, there can be multiple callbacks persisted for each cluster
// node. However, the client doesn't care about the parent child relationship between the nodes, only that it invokes
// the right callback for a particular cluster.
xdsC.cdsCbs[clusterName] = callback
xdsC.cdsWatchCh.Send(clusterName)
return func() {
xdsC.cdsCancelCh.Send(clusterName)
}
}
// WaitForWatchCluster waits for WatchCluster to be invoked on this client and
// returns the clusterName being watched.
func (xdsC *Client) WaitForWatchCluster(ctx context.Context) (string, error) {
val, err := xdsC.cdsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}
// InvokeWatchClusterCallback invokes the registered cdsWatch callback.
//
// Not thread safe with WatchCluster. Only call this after
// WaitForWatchCluster.
func (xdsC *Client) InvokeWatchClusterCallback(update xdsresource.ClusterUpdate, err error) {
// Keeps functionality with previous usage of this, if single callback call that callback.
if len(xdsC.cdsCbs) == 1 {
var clusterName string
for cluster := range xdsC.cdsCbs {
clusterName = cluster
}
xdsC.cdsCbs[clusterName](update, err)
} else {
// Have what callback you call with the update determined by the service name in the ClusterUpdate. Left up to the
// caller to make sure the cluster update matches with a persisted callback.
xdsC.cdsCbs[update.ClusterName](update, err)
}
}
// WaitForCancelClusterWatch waits for a CDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelClusterWatch(ctx context.Context) (string, error) {
clusterNameReceived, err := xdsC.cdsCancelCh.Receive(ctx)
if err != nil {
return "", err
}
return clusterNameReceived.(string), err
}
// WatchEndpoints registers an EDS watch for provided clusterName.
func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsresource.EndpointsUpdate, error)) (cancel func()) {
xdsC.edsCbs[clusterName] = callback

View File

@ -32,7 +32,6 @@ import (
type XDSClient interface {
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
WatchCluster(string, func(xdsresource.ClusterUpdate, error)) func()
// WatchResource uses xDS to discover the resource associated with the
// provided resource name. The resource type implementation determines how

View File

@ -81,37 +81,6 @@ func (c *clientImpl) WatchRouteConfig(resourceName string, cb func(xdsresource.R
return xdsresource.WatchRouteConfig(c, resourceName, watcher)
}
// This is only required temporarily, while we modify the
// clientImpl.WatchCluster API to be implemented via the wrapper WatchCluster()
// API which calls the WatchResource() API.
type clusterWatcher struct {
resourceName string
cb func(xdsresource.ClusterUpdate, error)
}
func (c *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {
c.cb(update.Resource, nil)
}
func (c *clusterWatcher) OnError(err error) {
c.cb(xdsresource.ClusterUpdate{}, err)
}
func (c *clusterWatcher) OnResourceDoesNotExist() {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", c.resourceName)
c.cb(xdsresource.ClusterUpdate{}, err)
}
// WatchCluster uses CDS to discover information about the Cluster resource
// identified by resourceName.
//
// WatchCluster can be called multiple times, with same or different
// clusterNames. Each call will start an independent watcher for the resource.
func (c *clientImpl) WatchCluster(resourceName string, cb func(xdsresource.ClusterUpdate, error)) (cancel func()) {
watcher := &clusterWatcher{resourceName: resourceName, cb: cb}
return xdsresource.WatchCluster(c, resourceName, watcher)
}
// WatchResource uses xDS to discover the resource associated with the provided
// resource name. The resource type implementation determines how xDS requests
// are sent out and how responses are deserialized and validated. Upon receipt

View File

@ -120,6 +120,12 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.
return lisDefault, lisNonDefault, client, close
}
type noopClusterWatcher struct{}
func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {}
func (noopClusterWatcher) OnError(err error) {}
func (noopClusterWatcher) OnResourceDoesNotExist() {}
// TestAuthorityShare tests the authority sharing logic. The test verifies the
// following scenarios:
// - A watch for a resource name with an authority matching an existing watch
@ -143,14 +149,15 @@ func (s) TestAuthorityShare(t *testing.T) {
}
// Request the first resource. Verify that a new transport is created.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
defer cdsCancel1()
if _, err := lis.NewConnCh.Receive(ctx); err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
}
// Request the second resource. Verify that no new transport is created.
cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher)
defer cdsCancel2()
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
@ -159,7 +166,7 @@ func (s) TestAuthorityShare(t *testing.T) {
}
// Request the third resource. Verify that no new transport is created.
cdsCancel3 := client.WatchCluster(authorityTestResourceName2, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel3 := xdsresource.WatchCluster(client, authorityTestResourceName2, watcher)
defer cdsCancel3()
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
@ -179,7 +186,8 @@ func (s) TestAuthorityIdleTimeout(t *testing.T) {
defer close()
// Request the first resource. Verify that a new transport is created.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
val, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
@ -187,7 +195,7 @@ func (s) TestAuthorityIdleTimeout(t *testing.T) {
conn := val.(*testutils.ConnWrapper)
// Request the second resource. Verify that no new transport is created.
cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := lis.NewConnCh.Receive(sCtx); err != context.DeadlineExceeded {
@ -225,7 +233,8 @@ func (s) TestAuthorityClientClose(t *testing.T) {
// Request the first resource. Verify that a new transport is created to the
// default management server.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
val, err := lisDefault.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
@ -235,7 +244,7 @@ func (s) TestAuthorityClientClose(t *testing.T) {
// Request another resource which is served by the non-default authority.
// Verify that a new transport is created to the non-default management
// server.
client.WatchCluster(authorityTestResourceName3, func(u xdsresource.ClusterUpdate, err error) {})
xdsresource.WatchCluster(client, authorityTestResourceName3, watcher)
val, err = lisNonDefault.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
@ -272,7 +281,8 @@ func (s) TestAuthorityRevive(t *testing.T) {
defer close()
// Request the first resource. Verify that a new transport is created.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
val, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
@ -284,7 +294,7 @@ func (s) TestAuthorityRevive(t *testing.T) {
// Request the second resource. Verify that no new transport is created.
// This should move the authority out of the idle cache.
cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher)
defer cdsCancel2()
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()

View File

@ -41,6 +41,30 @@ import (
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
type clusterWatcher struct {
updateCh *testutils.Channel
}
func newClusterWatcher() *clusterWatcher {
return &clusterWatcher{updateCh: testutils.NewChannel()}
}
func (ew *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {
ew.updateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: update.Resource})
}
func (ew *clusterWatcher) OnError(err error) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
ew.updateCh.Replace(xdsresource.ClusterUpdateErrTuple{Err: err})
}
func (ew *clusterWatcher) OnResourceDoesNotExist() {
ew.updateCh.Replace(xdsresource.ClusterUpdateErrTuple{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")})
}
// badClusterResource returns a cluster resource for the given name which
// contains a config_source_specifier for the `lrs_server` field which is not
// set to `self`, and hence is expected to be NACKed by the client.
@ -154,10 +178,8 @@ func (s) TestCDSWatch(t *testing.T) {
// Register a watch for a cluster resource and have the watch
// callback push the received update on to a channel.
updateCh := testutils.NewChannel()
cdsCancel := client.WatchCluster(test.resourceName, func(u xdsresource.ClusterUpdate, err error) {
updateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw := newClusterWatcher()
cdsCancel := xdsresource.WatchCluster(client, test.resourceName, cw)
// Configure the management server to return a single cluster
// resource, corresponding to the one we registered a watch for.
@ -173,7 +195,7 @@ func (s) TestCDSWatch(t *testing.T) {
}
// Verify the contents of the received update.
if err := verifyClusterUpdate(ctx, updateCh, test.wantUpdate); err != nil {
if err := verifyClusterUpdate(ctx, cw.updateCh, test.wantUpdate); err != nil {
t.Fatal(err)
}
@ -187,7 +209,7 @@ func (s) TestCDSWatch(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoClusterUpdate(ctx, updateCh); err != nil {
if err := verifyNoClusterUpdate(ctx, cw.updateCh); err != nil {
t.Fatal(err)
}
@ -202,7 +224,7 @@ func (s) TestCDSWatch(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoClusterUpdate(ctx, updateCh); err != nil {
if err := verifyNoClusterUpdate(ctx, cw.updateCh); err != nil {
t.Fatal(err)
}
})
@ -284,15 +306,11 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
// Register two watches for the same cluster resource and have the
// callbacks push the received updates on to a channel.
updateCh1 := testutils.NewChannel()
cdsCancel1 := client.WatchCluster(test.resourceName, func(u xdsresource.ClusterUpdate, err error) {
updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw1 := newClusterWatcher()
cdsCancel1 := xdsresource.WatchCluster(client, test.resourceName, cw1)
defer cdsCancel1()
updateCh2 := testutils.NewChannel()
cdsCancel2 := client.WatchCluster(test.resourceName, func(u xdsresource.ClusterUpdate, err error) {
updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw2 := newClusterWatcher()
cdsCancel2 := xdsresource.WatchCluster(client, test.resourceName, cw2)
// Configure the management server to return a single cluster
// resource, corresponding to the one we registered watches for.
@ -308,10 +326,10 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
}
// Verify the contents of the received update.
if err := verifyClusterUpdate(ctx, updateCh1, test.wantUpdateV1); err != nil {
if err := verifyClusterUpdate(ctx, cw1.updateCh, test.wantUpdateV1); err != nil {
t.Fatal(err)
}
if err := verifyClusterUpdate(ctx, updateCh2, test.wantUpdateV1); err != nil {
if err := verifyClusterUpdate(ctx, cw2.updateCh, test.wantUpdateV1); err != nil {
t.Fatal(err)
}
@ -322,10 +340,10 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoClusterUpdate(ctx, updateCh1); err != nil {
if err := verifyNoClusterUpdate(ctx, cw1.updateCh); err != nil {
t.Fatal(err)
}
if err := verifyNoClusterUpdate(ctx, updateCh2); err != nil {
if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil {
t.Fatal(err)
}
@ -339,10 +357,10 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyClusterUpdate(ctx, updateCh1, test.wantUpdateV2); err != nil {
if err := verifyClusterUpdate(ctx, cw1.updateCh, test.wantUpdateV2); err != nil {
t.Fatal(err)
}
if err := verifyNoClusterUpdate(ctx, updateCh2); err != nil {
if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil {
t.Fatal(err)
}
})
@ -369,23 +387,17 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
// Register two watches for the same cluster resource and have the
// callbacks push the received updates on to a channel.
updateCh1 := testutils.NewChannel()
cdsCancel1 := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) {
updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw1 := newClusterWatcher()
cdsCancel1 := xdsresource.WatchCluster(client, cdsName, cw1)
defer cdsCancel1()
updateCh2 := testutils.NewChannel()
cdsCancel2 := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) {
updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw2 := newClusterWatcher()
cdsCancel2 := xdsresource.WatchCluster(client, cdsName, cw2)
defer cdsCancel2()
// Register the third watch for a different cluster resource, and push the
// received updates onto a channel.
updateCh3 := testutils.NewChannel()
cdsCancel3 := client.WatchCluster(cdsNameNewStyle, func(u xdsresource.ClusterUpdate, err error) {
updateCh3.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw3 := newClusterWatcher()
cdsCancel3 := xdsresource.WatchCluster(client, cdsNameNewStyle, cw3)
defer cdsCancel3()
// Configure the management server to return two cluster resources,
@ -417,13 +429,13 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
EDSServiceName: edsNameNewStyle,
},
}
if err := verifyClusterUpdate(ctx, updateCh1, wantUpdate12); err != nil {
if err := verifyClusterUpdate(ctx, cw1.updateCh, wantUpdate12); err != nil {
t.Fatal(err)
}
if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate12); err != nil {
if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate12); err != nil {
t.Fatal(err)
}
if err := verifyClusterUpdate(ctx, updateCh3, wantUpdate3); err != nil {
if err := verifyClusterUpdate(ctx, cw3.updateCh, wantUpdate3); err != nil {
t.Fatal(err)
}
}
@ -466,10 +478,8 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) {
// Register a watch for a cluster resource and have the watch
// callback push the received update on to a channel.
updateCh1 := testutils.NewChannel()
cdsCancel1 := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) {
updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw1 := newClusterWatcher()
cdsCancel1 := xdsresource.WatchCluster(client, cdsName, cw1)
defer cdsCancel1()
// Configure the management server to return a single cluster
@ -492,7 +502,7 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) {
EDSServiceName: edsName,
},
}
if err := verifyClusterUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyClusterUpdate(ctx, cw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
select {
@ -503,12 +513,10 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) {
// Register another watch for the same resource. This should get the update
// from the cache.
updateCh2 := testutils.NewChannel()
cdsCancel2 := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) {
updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw2 := newClusterWatcher()
cdsCancel2 := xdsresource.WatchCluster(client, cdsName, cw2)
defer cdsCancel2()
if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
// No request should get sent out as part of this watch.
@ -544,10 +552,8 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// Register a watch for a resource which is expected to be invoked with an
// error after the watch expiry timer fires.
updateCh := testutils.NewChannel()
cdsCancel := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) {
updateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw := newClusterWatcher()
cdsCancel := xdsresource.WatchCluster(client, cdsName, cw)
defer cdsCancel()
// Wait for the watch expiry timer to fire.
@ -557,7 +563,7 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
if err := verifyClusterUpdate(ctx, updateCh, xdsresource.ClusterUpdateErrTuple{Err: wantErr}); err != nil {
if err := verifyClusterUpdate(ctx, cw.updateCh, xdsresource.ClusterUpdateErrTuple{Err: wantErr}); err != nil {
t.Fatal(err)
}
}
@ -587,10 +593,8 @@ func (s) TestCDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
// Register a watch for a cluster resource and have the watch
// callback push the received update on to a channel.
updateCh := testutils.NewChannel()
cdsCancel := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) {
updateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw := newClusterWatcher()
cdsCancel := xdsresource.WatchCluster(client, cdsName, cw)
defer cdsCancel()
// Configure the management server to return a single cluster resource,
@ -613,14 +617,14 @@ func (s) TestCDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
EDSServiceName: edsName,
},
}
if err := verifyClusterUpdate(ctx, updateCh, wantUpdate); err != nil {
if err := verifyClusterUpdate(ctx, cw.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
// Wait for the watch expiry timer to fire, and verify that the callback is
// not invoked.
<-time.After(defaultTestWatchExpiryTimeout)
if err := verifyNoClusterUpdate(ctx, updateCh); err != nil {
if err := verifyNoClusterUpdate(ctx, cw.updateCh); err != nil {
t.Fatal(err)
}
}
@ -650,16 +654,12 @@ func (s) TesCDSWatch_ResourceRemoved(t *testing.T) {
// Register two watches for two cluster resources and have the
// callbacks push the received updates on to a channel.
resourceName1 := cdsName
updateCh1 := testutils.NewChannel()
cdsCancel1 := client.WatchCluster(resourceName1, func(u xdsresource.ClusterUpdate, err error) {
updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw1 := newClusterWatcher()
cdsCancel1 := xdsresource.WatchCluster(client, resourceName1, cw1)
defer cdsCancel1()
resourceName2 := cdsNameNewStyle
updateCh2 := testutils.NewChannel()
cdsCancel2 := client.WatchCluster(resourceName2, func(u xdsresource.ClusterUpdate, err error) {
updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw2 := newClusterWatcher()
cdsCancel2 := xdsresource.WatchCluster(client, resourceName1, cw2)
defer cdsCancel2()
// Configure the management server to return two cluster resources,
@ -691,10 +691,10 @@ func (s) TesCDSWatch_ResourceRemoved(t *testing.T) {
EDSServiceName: edsNameNewStyle,
},
}
if err := verifyClusterUpdate(ctx, updateCh1, wantUpdate1); err != nil {
if err := verifyClusterUpdate(ctx, cw1.updateCh, wantUpdate1); err != nil {
t.Fatal(err)
}
if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate2); err != nil {
if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate2); err != nil {
t.Fatal(err)
}
@ -710,10 +710,10 @@ func (s) TesCDSWatch_ResourceRemoved(t *testing.T) {
// The first watcher should receive a resource removed error, while the
// second watcher should not receive an update.
if err := verifyClusterUpdate(ctx, updateCh1, xdsresource.ClusterUpdateErrTuple{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil {
if err := verifyClusterUpdate(ctx, cw1.updateCh, xdsresource.ClusterUpdateErrTuple{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil {
t.Fatal(err)
}
if err := verifyNoClusterUpdate(ctx, updateCh2); err != nil {
if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil {
t.Fatal(err)
}
@ -727,7 +727,7 @@ func (s) TesCDSWatch_ResourceRemoved(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoClusterUpdate(ctx, updateCh1); err != nil {
if err := verifyNoClusterUpdate(ctx, cw1.updateCh); err != nil {
t.Fatal(err)
}
wantUpdate := xdsresource.ClusterUpdateErrTuple{
@ -736,7 +736,7 @@ func (s) TesCDSWatch_ResourceRemoved(t *testing.T) {
EDSServiceName: "new-eds-resource",
},
}
if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}
@ -758,12 +758,8 @@ func (s) TestCDSWatch_NACKError(t *testing.T) {
// Register a watch for a cluster resource and have the watch
// callback push the received update on to a channel.
updateCh := testutils.NewChannel()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cdsCancel := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) {
updateCh.SendContext(ctx, xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw := newClusterWatcher()
cdsCancel := xdsresource.WatchCluster(client, cdsName, cw)
defer cdsCancel()
// Configure the management server to return a single cluster resource
@ -773,12 +769,14 @@ func (s) TestCDSWatch_NACKError(t *testing.T) {
Clusters: []*v3clusterpb.Cluster{badClusterResource(cdsName, edsName, e2e.SecurityLevelNone)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Verify that the expected error is propagated to the watcher.
u, err := updateCh.Receive(ctx)
u, err := cw.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a cluster resource from the management server: %v", err)
}
@ -808,19 +806,13 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) {
// Register two watches for cluster resources. The first watch is expected
// to receive an error because the received resource is NACK'ed. The second
// watch is expected to get a good update.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
badResourceName := cdsName
updateCh1 := testutils.NewChannel()
cdsCancel1 := client.WatchCluster(badResourceName, func(u xdsresource.ClusterUpdate, err error) {
updateCh1.SendContext(ctx, xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw1 := newClusterWatcher()
cdsCancel1 := xdsresource.WatchCluster(client, badResourceName, cw1)
defer cdsCancel1()
goodResourceName := cdsNameNewStyle
updateCh2 := testutils.NewChannel()
cdsCancel2 := client.WatchCluster(goodResourceName, func(u xdsresource.ClusterUpdate, err error) {
updateCh2.SendContext(ctx, xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw2 := newClusterWatcher()
cdsCancel2 := xdsresource.WatchCluster(client, goodResourceName, cw2)
defer cdsCancel2()
// Configure the management server with two cluster resources. One of these
@ -832,13 +824,15 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) {
e2e.DefaultCluster(goodResourceName, edsName, e2e.SecurityLevelNone)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Verify that the expected error is propagated to the watcher which is
// watching the bad resource.
u, err := updateCh1.Receive(ctx)
u, err := cw1.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a cluster resource from the management server: %v", err)
}
@ -855,7 +849,7 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) {
EDSServiceName: edsName,
},
}
if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}
@ -882,16 +876,12 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) {
// Register two watches for two cluster resources and have the
// callbacks push the received updates on to a channel.
resourceName1 := cdsName
updateCh1 := testutils.NewChannel()
cdsCancel1 := client.WatchCluster(resourceName1, func(u xdsresource.ClusterUpdate, err error) {
updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw1 := newClusterWatcher()
cdsCancel1 := xdsresource.WatchCluster(client, resourceName1, cw1)
defer cdsCancel1()
resourceName2 := cdsNameNewStyle
updateCh2 := testutils.NewChannel()
cdsCancel2 := client.WatchCluster(resourceName2, func(u xdsresource.ClusterUpdate, err error) {
updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw2 := newClusterWatcher()
cdsCancel2 := xdsresource.WatchCluster(client, resourceName2, cw2)
defer cdsCancel2()
// Configure the management server to return only one of the two cluster
@ -914,12 +904,12 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) {
EDSServiceName: edsName,
},
}
if err := verifyClusterUpdate(ctx, updateCh1, wantUpdate1); err != nil {
if err := verifyClusterUpdate(ctx, cw1.updateCh, wantUpdate1); err != nil {
t.Fatal(err)
}
// Verify that the second watcher does not get an update with an error.
if err := verifyNoClusterUpdate(ctx, updateCh2); err != nil {
if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil {
t.Fatal(err)
}
@ -944,13 +934,13 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) {
EDSServiceName: edsNameNewStyle,
},
}
if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate2); err != nil {
if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate2); err != nil {
t.Fatal(err)
}
// Verify that the first watcher gets no update, as the first resource did
// not change.
if err := verifyNoClusterUpdate(ctx, updateCh1); err != nil {
if err := verifyNoClusterUpdate(ctx, cw1.updateCh); err != nil {
t.Fatal(err)
}
}

View File

@ -125,7 +125,7 @@ func (s) TestDumpResources(t *testing.T) {
client.WatchRouteConfig(target, func(xdsresource.RouteConfigUpdate, error) {})
}
for _, target := range cdsTargets {
client.WatchCluster(target, func(xdsresource.ClusterUpdate, error) {})
xdsresource.WatchCluster(client, target, noopClusterWatcher{})
}
for _, target := range edsTargets {
xdsresource.WatchEndpoints(client, target, noopEndpointsWatcher{})

View File

@ -70,11 +70,15 @@ func (ew *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData)
}
func (ew *endpointsWatcher) OnError(err error) {
ew.updateCh.SendOrFail(endpointsUpdateErrTuple{err: err})
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
ew.updateCh.Replace(endpointsUpdateErrTuple{err: err})
}
func (ew *endpointsWatcher) OnResourceDoesNotExist() {
ew.updateCh.SendOrFail(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")})
ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")})
}
// badEndpointsResource returns a endpoints resource for the given

View File

@ -219,15 +219,11 @@ func (s) TestFederation_ClusterResourceContextParamOrder(t *testing.T) {
// Register two watches for cluster resources with the same query string,
// but context parameters in different order.
updateCh1 := testutils.NewChannel()
cdsCancel1 := client.WatchCluster(resourceName1, func(u xdsresource.ClusterUpdate, err error) {
updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw1 := newClusterWatcher()
cdsCancel1 := xdsresource.WatchCluster(client, resourceName1, cw1)
defer cdsCancel1()
updateCh2 := testutils.NewChannel()
cdsCancel2 := client.WatchCluster(resourceName2, func(u xdsresource.ClusterUpdate, err error) {
updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err})
})
cw2 := newClusterWatcher()
cdsCancel2 := xdsresource.WatchCluster(client, resourceName2, cw2)
defer cdsCancel2()
// Configure the management server for the non-default authority to return a
@ -250,10 +246,10 @@ func (s) TestFederation_ClusterResourceContextParamOrder(t *testing.T) {
},
}
// Verify the contents of the received update.
if err := verifyClusterUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyClusterUpdate(ctx, cw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}

View File

@ -738,18 +738,9 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// A wrapper struct to wrap the update and the associated error, as
// received by the resource watch callback.
type updateAndErr struct {
update xdsresource.ClusterUpdate
err error
}
updateAndErrCh := testutils.NewChannel()
// Register a watch, and push the results on to a channel.
client.WatchCluster(test.resourceName, func(update xdsresource.ClusterUpdate, err error) {
updateAndErrCh.Send(updateAndErr{update: update, err: err})
})
cw := newClusterWatcher()
xdsresource.WatchCluster(client, test.resourceName, cw)
t.Logf("Registered a watch for Cluster %q", test.resourceName)
// Wait for the discovery request to be sent out.
@ -775,12 +766,12 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
// Wait for an update from the xDS client and compare with expected
// update.
val, err = updateAndErrCh.Receive(ctx)
val, err = cw.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout when waiting for watch callback to invoked after response from management server: %v", err)
}
gotUpdate := val.(updateAndErr).update
gotErr := val.(updateAndErr).err
gotUpdate := val.(xdsresource.ClusterUpdateErrTuple).Update
gotErr := val.(xdsresource.ClusterUpdateErrTuple).Err
if (gotErr != nil) != (test.wantErr != "") {
t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr)
}