xds: switch EDS watch to new generic xdsClient API (#6414)

This commit is contained in:
Easwar Swaminathan 2023-06-27 13:37:55 -07:00 committed by GitHub
parent e8599844e7
commit 7eb57278c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 157 additions and 194 deletions

View File

@ -25,11 +25,8 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
type edsResourceWatcher interface {
WatchEndpoints(string, func(xdsresource.EndpointsUpdate, error)) func()
}
type edsDiscoveryMechanism struct {
nameToWatch string
cancelWatch func()
topLevelResolver topLevelResolver
stopped *grpcsync.Event
@ -64,31 +61,44 @@ func (er *edsDiscoveryMechanism) stop() {
er.cancelWatch()
}
func (er *edsDiscoveryMechanism) handleEndpointsUpdate(update xdsresource.EndpointsUpdate, err error) {
// newEDSResolver returns an implementation of the endpointsResolver interface
// that uses EDS to resolve the given name to endpoints.
func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelResolver topLevelResolver) *edsDiscoveryMechanism {
ret := &edsDiscoveryMechanism{
nameToWatch: nameToWatch,
topLevelResolver: topLevelResolver,
stopped: grpcsync.NewEvent(),
}
ret.cancelWatch = xdsresource.WatchEndpoints(producer, nameToWatch, ret)
return ret
}
// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData) {
if er.stopped.HasFired() {
return
}
if err != nil {
er.topLevelResolver.onError(err)
return
}
er.mu.Lock()
er.update = update
er.update = update.Resource
er.updateReceived = true
er.mu.Unlock()
er.topLevelResolver.onUpdate()
}
// newEDSResolver returns an implementation of the endpointsResolver interface
// that uses EDS to resolve the given name to endpoints.
func newEDSResolver(nameToWatch string, watcher edsResourceWatcher, topLevelResolver topLevelResolver) *edsDiscoveryMechanism {
ret := &edsDiscoveryMechanism{
topLevelResolver: topLevelResolver,
stopped: grpcsync.NewEvent(),
func (er *edsDiscoveryMechanism) OnError(err error) {
if er.stopped.HasFired() {
return
}
ret.cancelWatch = watcher.WatchEndpoints(nameToWatch, ret.handleEndpointsUpdate)
return ret
er.topLevelResolver.onError(err)
}
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() {
if er.stopped.HasFired() {
return
}
er.topLevelResolver.onError(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Endpoints not found in received response", er.nameToWatch))
}

View File

@ -33,7 +33,6 @@ type XDSClient interface {
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
WatchCluster(string, func(xdsresource.ClusterUpdate, error)) func()
WatchEndpoints(string, func(xdsresource.EndpointsUpdate, error)) func()
// WatchResource uses xDS to discover the resource associated with the
// provided resource name. The resource type implementation determines how

View File

@ -112,37 +112,6 @@ func (c *clientImpl) WatchCluster(resourceName string, cb func(xdsresource.Clust
return xdsresource.WatchCluster(c, resourceName, watcher)
}
// This is only required temporarily, while we modify the
// clientImpl.WatchEndpoints API to be implemented via the wrapper
// WatchEndpoints() API which calls the WatchResource() API.
type endpointsWatcher struct {
resourceName string
cb func(xdsresource.EndpointsUpdate, error)
}
func (c *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {
c.cb(update.Resource, nil)
}
func (c *endpointsWatcher) OnError(err error) {
c.cb(xdsresource.EndpointsUpdate{}, err)
}
func (c *endpointsWatcher) OnResourceDoesNotExist() {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Endpoints not found in received response", c.resourceName)
c.cb(xdsresource.EndpointsUpdate{}, err)
}
// WatchEndpoints uses EDS to discover information about the
// ClusterLoadAssignment resource identified by resourceName.
//
// WatchEndpoints can be called multiple times, with same or different
// clusterNames. Each call will start an independent watcher for the resource.
func (c *clientImpl) WatchEndpoints(resourceName string, cb func(xdsresource.EndpointsUpdate, error)) (cancel func()) {
watcher := &endpointsWatcher{resourceName: resourceName, cb: cb}
return xdsresource.WatchEndpoints(c, resourceName, watcher)
}
// WatchResource uses xDS to discover the resource associated with the provided
// resource name. The resource type implementation determines how xDS requests
// are sent out and how responses are deserialized and validated. Upon receipt

View File

@ -60,6 +60,12 @@ func compareDump(ctx context.Context, client xdsclient.XDSClient, want map[strin
}
}
type noopEndpointsWatcher struct{}
func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {}
func (noopEndpointsWatcher) OnError(err error) {}
func (noopEndpointsWatcher) OnResourceDoesNotExist() {}
func (s) TestDumpResources(t *testing.T) {
// Initialize the xDS resources to be used in this test.
ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"}
@ -122,7 +128,7 @@ func (s) TestDumpResources(t *testing.T) {
client.WatchCluster(target, func(xdsresource.ClusterUpdate, error) {})
}
for _, target := range edsTargets {
client.WatchEndpoints(target, func(xdsresource.EndpointsUpdate, error) {})
xdsresource.WatchEndpoints(client, target, noopEndpointsWatcher{})
}
want := map[string]map[string]xdsresource.UpdateWithMD{
"type.googleapis.com/envoy.config.listener.v3.Listener": {

View File

@ -52,6 +52,31 @@ const (
edsPort3 = 3
)
type endpointsUpdateErrTuple struct {
update xdsresource.EndpointsUpdate
err error
}
type endpointsWatcher struct {
updateCh *testutils.Channel
}
func newEndpointsWatcher() *endpointsWatcher {
return &endpointsWatcher{updateCh: testutils.NewChannel()}
}
func (ew *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {
ew.updateCh.Send(endpointsUpdateErrTuple{update: update.Resource})
}
func (ew *endpointsWatcher) OnError(err error) {
ew.updateCh.SendOrFail(endpointsUpdateErrTuple{err: err})
}
func (ew *endpointsWatcher) OnResourceDoesNotExist() {
ew.updateCh.SendOrFail(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")})
}
// badEndpointsResource returns a endpoints resource for the given
// edsServiceName which contains an endpoint with a load_balancing weight of
// `0`. This is expected to be NACK'ed by the xDS client.
@ -71,19 +96,19 @@ const wantEndpointsNACKErr = "EDS response contains an endpoint with zero weight
//
// Returns an error if no update is received before the context deadline expires
// or the received update does not match the expected one.
func verifyEndpointsUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.EndpointsUpdateErrTuple) error {
func verifyEndpointsUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate endpointsUpdateErrTuple) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for a endpoints resource from the management server: %v", err)
}
got := u.(xdsresource.EndpointsUpdateErrTuple)
if wantUpdate.Err != nil {
if gotType, wantType := xdsresource.ErrType(got.Err), xdsresource.ErrType(wantUpdate.Err); gotType != wantType {
got := u.(endpointsUpdateErrTuple)
if wantUpdate.err != nil {
if gotType, wantType := xdsresource.ErrType(got.err), xdsresource.ErrType(wantUpdate.err); gotType != wantType {
return fmt.Errorf("received update with error type %v, want %v", gotType, wantType)
}
}
cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.EndpointsUpdate{}, "Raw")}
if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" {
if diff := cmp.Diff(wantUpdate.update, got.update, cmpOpts...); diff != "" {
return fmt.Errorf("received unepected diff in the endpoints resource update: (-want, got):\n%s", diff)
}
return nil
@ -121,7 +146,7 @@ func (s) TestEDSWatch(t *testing.T) {
watchedResource *v3endpointpb.ClusterLoadAssignment // The resource being watched.
updatedWatchedResource *v3endpointpb.ClusterLoadAssignment // The watched resource after an update.
notWatchedResource *v3endpointpb.ClusterLoadAssignment // A resource which is not being watched.
wantUpdate xdsresource.EndpointsUpdateErrTuple
wantUpdate endpointsUpdateErrTuple
}{
{
desc: "old style resource",
@ -129,8 +154,8 @@ func (s) TestEDSWatch(t *testing.T) {
watchedResource: e2e.DefaultEndpoint(edsName, edsHost1, []uint32{edsPort1}),
updatedWatchedResource: e2e.DefaultEndpoint(edsName, edsHost2, []uint32{edsPort2}),
notWatchedResource: e2e.DefaultEndpoint("unsubscribed-eds-resource", edsHost3, []uint32{edsPort3}),
wantUpdate: xdsresource.EndpointsUpdateErrTuple{
Update: xdsresource.EndpointsUpdate{
wantUpdate: endpointsUpdateErrTuple{
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
@ -152,8 +177,8 @@ func (s) TestEDSWatch(t *testing.T) {
watchedResource: e2e.DefaultEndpoint(edsNameNewStyle, edsHost1, []uint32{edsPort1}),
updatedWatchedResource: e2e.DefaultEndpoint(edsNameNewStyle, edsHost2, []uint32{edsPort2}),
notWatchedResource: e2e.DefaultEndpoint("unsubscribed-eds-resource", edsHost3, []uint32{edsPort3}),
wantUpdate: xdsresource.EndpointsUpdateErrTuple{
Update: xdsresource.EndpointsUpdate{
wantUpdate: endpointsUpdateErrTuple{
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
@ -186,10 +211,8 @@ func (s) TestEDSWatch(t *testing.T) {
// Register a watch for a endpoint resource and have the watch
// callback push the received update on to a channel.
updateCh := testutils.NewChannel()
edsCancel := client.WatchEndpoints(test.resourceName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew := newEndpointsWatcher()
edsCancel := xdsresource.WatchEndpoints(client, test.resourceName, ew)
// Configure the management server to return a single endpoint
// resource, corresponding to the one being watched.
@ -205,7 +228,7 @@ func (s) TestEDSWatch(t *testing.T) {
}
// Verify the contents of the received update.
if err := verifyEndpointsUpdate(ctx, updateCh, test.wantUpdate); err != nil {
if err := verifyEndpointsUpdate(ctx, ew.updateCh, test.wantUpdate); err != nil {
t.Fatal(err)
}
@ -219,7 +242,7 @@ func (s) TestEDSWatch(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoEndpointsUpdate(ctx, updateCh); err != nil {
if err := verifyNoEndpointsUpdate(ctx, ew.updateCh); err != nil {
t.Fatal(err)
}
@ -234,7 +257,7 @@ func (s) TestEDSWatch(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoEndpointsUpdate(ctx, updateCh); err != nil {
if err := verifyNoEndpointsUpdate(ctx, ew.updateCh); err != nil {
t.Fatal(err)
}
})
@ -260,16 +283,16 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
resourceName string
watchedResource *v3endpointpb.ClusterLoadAssignment // The resource being watched.
updatedWatchedResource *v3endpointpb.ClusterLoadAssignment // The watched resource after an update.
wantUpdateV1 xdsresource.EndpointsUpdateErrTuple
wantUpdateV2 xdsresource.EndpointsUpdateErrTuple
wantUpdateV1 endpointsUpdateErrTuple
wantUpdateV2 endpointsUpdateErrTuple
}{
{
desc: "old style resource",
resourceName: edsName,
watchedResource: e2e.DefaultEndpoint(edsName, edsHost1, []uint32{edsPort1}),
updatedWatchedResource: e2e.DefaultEndpoint(edsName, edsHost2, []uint32{edsPort2}),
wantUpdateV1: xdsresource.EndpointsUpdateErrTuple{
Update: xdsresource.EndpointsUpdate{
wantUpdateV1: endpointsUpdateErrTuple{
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
@ -284,8 +307,8 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
},
},
},
wantUpdateV2: xdsresource.EndpointsUpdateErrTuple{
Update: xdsresource.EndpointsUpdate{
wantUpdateV2: endpointsUpdateErrTuple{
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost2, edsPort2), Weight: 1}},
@ -306,8 +329,8 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
resourceName: edsNameNewStyle,
watchedResource: e2e.DefaultEndpoint(edsNameNewStyle, edsHost1, []uint32{edsPort1}),
updatedWatchedResource: e2e.DefaultEndpoint(edsNameNewStyle, edsHost2, []uint32{edsPort2}),
wantUpdateV1: xdsresource.EndpointsUpdateErrTuple{
Update: xdsresource.EndpointsUpdate{
wantUpdateV1: endpointsUpdateErrTuple{
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
@ -322,8 +345,8 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
},
},
},
wantUpdateV2: xdsresource.EndpointsUpdateErrTuple{
Update: xdsresource.EndpointsUpdate{
wantUpdateV2: endpointsUpdateErrTuple{
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost2, edsPort2), Weight: 1}},
@ -356,15 +379,11 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
// Register two watches for the same endpoint resource and have the
// callbacks push the received updates on to a channel.
updateCh1 := testutils.NewChannel()
edsCancel1 := client.WatchEndpoints(test.resourceName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh1.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew1 := newEndpointsWatcher()
edsCancel1 := xdsresource.WatchEndpoints(client, test.resourceName, ew1)
defer edsCancel1()
updateCh2 := testutils.NewChannel()
edsCancel2 := client.WatchEndpoints(test.resourceName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh2.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew2 := newEndpointsWatcher()
edsCancel2 := xdsresource.WatchEndpoints(client, test.resourceName, ew2)
// Configure the management server to return a single endpoint
// resource, corresponding to the one being watched.
@ -380,10 +399,10 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
}
// Verify the contents of the received update.
if err := verifyEndpointsUpdate(ctx, updateCh1, test.wantUpdateV1); err != nil {
if err := verifyEndpointsUpdate(ctx, ew1.updateCh, test.wantUpdateV1); err != nil {
t.Fatal(err)
}
if err := verifyEndpointsUpdate(ctx, updateCh2, test.wantUpdateV1); err != nil {
if err := verifyEndpointsUpdate(ctx, ew2.updateCh, test.wantUpdateV1); err != nil {
t.Fatal(err)
}
@ -394,10 +413,10 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyNoEndpointsUpdate(ctx, updateCh1); err != nil {
if err := verifyNoEndpointsUpdate(ctx, ew1.updateCh); err != nil {
t.Fatal(err)
}
if err := verifyNoEndpointsUpdate(ctx, updateCh2); err != nil {
if err := verifyNoEndpointsUpdate(ctx, ew2.updateCh); err != nil {
t.Fatal(err)
}
@ -411,10 +430,10 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := verifyEndpointsUpdate(ctx, updateCh1, test.wantUpdateV2); err != nil {
if err := verifyEndpointsUpdate(ctx, ew1.updateCh, test.wantUpdateV2); err != nil {
t.Fatal(err)
}
if err := verifyNoEndpointsUpdate(ctx, updateCh2); err != nil {
if err := verifyNoEndpointsUpdate(ctx, ew2.updateCh); err != nil {
t.Fatal(err)
}
})
@ -442,22 +461,16 @@ func (s) TestEDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
// Register two watches for the same endpoint resource and have the
// callbacks push the received updates on to a channel.
updateCh1 := testutils.NewChannel()
edsCancel1 := client.WatchEndpoints(edsName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh1.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew1 := newEndpointsWatcher()
edsCancel1 := xdsresource.WatchEndpoints(client, edsName, ew1)
defer edsCancel1()
updateCh2 := testutils.NewChannel()
edsCancel2 := client.WatchEndpoints(edsName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh2.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew2 := newEndpointsWatcher()
edsCancel2 := xdsresource.WatchEndpoints(client, edsName, ew2)
defer edsCancel2()
// Register the third watch for a different endpoint resource.
updateCh3 := testutils.NewChannel()
edsCancel3 := client.WatchEndpoints(edsNameNewStyle, func(u xdsresource.EndpointsUpdate, err error) {
updateCh3.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew3 := newEndpointsWatcher()
edsCancel3 := xdsresource.WatchEndpoints(client, edsNameNewStyle, ew3)
defer edsCancel3()
// Configure the management server to return two endpoint resources,
@ -479,8 +492,8 @@ func (s) TestEDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
// Verify the contents of the received update for the all watchers. The two
// resources returned differ only in the resource name. Therefore the
// expected update is the same for all the watchers.
wantUpdate := xdsresource.EndpointsUpdateErrTuple{
Update: xdsresource.EndpointsUpdate{
wantUpdate := endpointsUpdateErrTuple{
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
@ -495,13 +508,13 @@ func (s) TestEDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
},
},
}
if err := verifyEndpointsUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyEndpointsUpdate(ctx, ew1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
if err := verifyEndpointsUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyEndpointsUpdate(ctx, ew2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
if err := verifyEndpointsUpdate(ctx, updateCh3, wantUpdate); err != nil {
if err := verifyEndpointsUpdate(ctx, ew3.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}
@ -544,10 +557,8 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) {
// Register a watch for an endpoint resource and have the watch callback
// push the received update on to a channel.
updateCh1 := testutils.NewChannel()
edsCancel1 := client.WatchEndpoints(edsName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh1.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew1 := newEndpointsWatcher()
edsCancel1 := xdsresource.WatchEndpoints(client, edsName, ew1)
defer edsCancel1()
// Configure the management server to return a single endpoint resource,
@ -564,8 +575,8 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) {
}
// Verify the contents of the received update.
wantUpdate := xdsresource.EndpointsUpdateErrTuple{
Update: xdsresource.EndpointsUpdate{
wantUpdate := endpointsUpdateErrTuple{
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
@ -580,7 +591,7 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) {
},
},
}
if err := verifyEndpointsUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyEndpointsUpdate(ctx, ew1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
select {
@ -591,12 +602,10 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) {
// Register another watch for the same resource. This should get the update
// from the cache.
updateCh2 := testutils.NewChannel()
edsCancel2 := client.WatchEndpoints(edsName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh2.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew2 := newEndpointsWatcher()
edsCancel2 := xdsresource.WatchEndpoints(client, edsName, ew2)
defer edsCancel2()
if err := verifyEndpointsUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyEndpointsUpdate(ctx, ew2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
@ -633,10 +642,8 @@ func (s) TestEDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// Register a watch for a resource which is expected to fail with an error
// after the watch expiry timer fires.
updateCh := testutils.NewChannel()
edsCancel := client.WatchEndpoints(edsName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew := newEndpointsWatcher()
edsCancel := xdsresource.WatchEndpoints(client, edsName, ew)
defer edsCancel()
// Wait for the watch expiry timer to fire.
@ -646,7 +653,7 @@ func (s) TestEDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
if err := verifyEndpointsUpdate(ctx, updateCh, xdsresource.EndpointsUpdateErrTuple{Err: wantErr}); err != nil {
if err := verifyEndpointsUpdate(ctx, ew.updateCh, endpointsUpdateErrTuple{err: wantErr}); err != nil {
t.Fatal(err)
}
}
@ -676,10 +683,8 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
// Register a watch for an endpoint resource and have the watch callback
// push the received update on to a channel.
updateCh := testutils.NewChannel()
edsCancel := client.WatchEndpoints(edsName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew := newEndpointsWatcher()
edsCancel := xdsresource.WatchEndpoints(client, edsName, ew)
defer edsCancel()
// Configure the management server to return a single endpoint resource,
@ -696,8 +701,8 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
}
// Verify the contents of the received update.
wantUpdate := xdsresource.EndpointsUpdateErrTuple{
Update: xdsresource.EndpointsUpdate{
wantUpdate := endpointsUpdateErrTuple{
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
@ -712,14 +717,14 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
},
},
}
if err := verifyEndpointsUpdate(ctx, updateCh, wantUpdate); err != nil {
if err := verifyEndpointsUpdate(ctx, ew.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
// Wait for the watch expiry timer to fire, and verify that the callback is
// not invoked.
<-time.After(defaultTestWatchExpiryTimeout)
if err := verifyNoEndpointsUpdate(ctx, updateCh); err != nil {
if err := verifyNoEndpointsUpdate(ctx, ew.updateCh); err != nil {
t.Fatal(err)
}
}
@ -741,12 +746,8 @@ func (s) TestEDSWatch_NACKError(t *testing.T) {
// Register a watch for a route configuration resource and have the watch
// callback push the received update on to a channel.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
updateCh := testutils.NewChannel()
edsCancel := client.WatchEndpoints(edsName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh.SendContext(ctx, xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew := newEndpointsWatcher()
edsCancel := xdsresource.WatchEndpoints(client, edsName, ew)
defer edsCancel()
// Configure the management server to return a single route configuration
@ -756,16 +757,18 @@ func (s) TestEDSWatch_NACKError(t *testing.T) {
Endpoints: []*v3endpointpb.ClusterLoadAssignment{badEndpointsResource(edsName, edsHost1, []uint32{edsPort1})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Verify that the expected error is propagated to the watcher.
u, err := updateCh.Receive(ctx)
u, err := ew.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for an endpoints resource from the management server: %v", err)
}
gotErr := u.(xdsresource.EndpointsUpdateErrTuple).Err
gotErr := u.(endpointsUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantEndpointsNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantEndpointsNACKErr)
}
@ -791,19 +794,13 @@ func (s) TestEDSWatch_PartialValid(t *testing.T) {
// Register two watches for two endpoint resources. The first watch is
// expected to receive an error because the received resource is NACKed.
// The second watch is expected to get a good update.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
badResourceName := rdsName
updateCh1 := testutils.NewChannel()
edsCancel1 := client.WatchEndpoints(badResourceName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh1.SendContext(ctx, xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew1 := newEndpointsWatcher()
edsCancel1 := xdsresource.WatchEndpoints(client, badResourceName, ew1)
defer edsCancel1()
goodResourceName := ldsNameNewStyle
updateCh2 := testutils.NewChannel()
edsCancel2 := client.WatchEndpoints(goodResourceName, func(u xdsresource.EndpointsUpdate, err error) {
updateCh2.SendContext(ctx, xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
ew2 := newEndpointsWatcher()
edsCancel2 := xdsresource.WatchEndpoints(client, goodResourceName, ew2)
defer edsCancel2()
// Configure the management server to return two endpoints resources,
@ -816,24 +813,26 @@ func (s) TestEDSWatch_PartialValid(t *testing.T) {
},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Verify that the expected error is propagated to the watcher which
// requested for the bad resource.
u, err := updateCh1.Receive(ctx)
u, err := ew1.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for an endpoints resource from the management server: %v", err)
}
gotErr := u.(xdsresource.EndpointsUpdateErrTuple).Err
gotErr := u.(endpointsUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantEndpointsNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantEndpointsNACKErr)
}
// Verify that the watcher watching the good resource receives an update.
wantUpdate := xdsresource.EndpointsUpdateErrTuple{
Update: xdsresource.EndpointsUpdate{
wantUpdate := endpointsUpdateErrTuple{
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
@ -848,7 +847,7 @@ func (s) TestEDSWatch_PartialValid(t *testing.T) {
},
},
}
if err := verifyEndpointsUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyEndpointsUpdate(ctx, ew2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}

View File

@ -275,16 +275,12 @@ func (s) TestFederation_EndpointsResourceContextParamOrder(t *testing.T) {
// Register two watches for endpoint resources with the same query string,
// but context parameters in different order.
updateCh1 := testutils.NewChannel()
cdsCancel1 := client.WatchEndpoints(resourceName1, func(u xdsresource.EndpointsUpdate, err error) {
updateCh1.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
defer cdsCancel1()
updateCh2 := testutils.NewChannel()
cdsCancel2 := client.WatchEndpoints(resourceName2, func(u xdsresource.EndpointsUpdate, err error) {
updateCh2.Send(xdsresource.EndpointsUpdateErrTuple{Update: u, Err: err})
})
defer cdsCancel2()
ew1 := newEndpointsWatcher()
edsCancel1 := xdsresource.WatchEndpoints(client, resourceName1, ew1)
defer edsCancel1()
ew2 := newEndpointsWatcher()
edsCancel2 := xdsresource.WatchEndpoints(client, resourceName2, ew2)
defer edsCancel2()
// Configure the management server for the non-default authority to return a
// single endpoints resource, corresponding to the watches registered.
@ -299,8 +295,8 @@ func (s) TestFederation_EndpointsResourceContextParamOrder(t *testing.T) {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
wantUpdate := xdsresource.EndpointsUpdateErrTuple{
Update: xdsresource.EndpointsUpdate{
wantUpdate := endpointsUpdateErrTuple{
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: "localhost:666", Weight: 1}},
@ -315,10 +311,10 @@ func (s) TestFederation_EndpointsResourceContextParamOrder(t *testing.T) {
},
}
// Verify the contents of the received update.
if err := verifyEndpointsUpdate(ctx, updateCh1, wantUpdate); err != nil {
if err := verifyEndpointsUpdate(ctx, ew1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
if err := verifyEndpointsUpdate(ctx, updateCh2, wantUpdate); err != nil {
if err := verifyEndpointsUpdate(ctx, ew2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}

View File

@ -1072,18 +1072,10 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) {
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// A wrapper struct to wrap the update and the associated error, as
// received by the resource watch callback.
type updateAndErr struct {
update xdsresource.EndpointsUpdate
err error
}
updateAndErrCh := testutils.NewChannel()
// Register a watch, and push the results on to a channel.
client.WatchEndpoints(test.resourceName, func(update xdsresource.EndpointsUpdate, err error) {
updateAndErrCh.Send(updateAndErr{update: update, err: err})
})
ew := newEndpointsWatcher()
edsCancel := xdsresource.WatchEndpoints(client, test.resourceName, ew)
defer edsCancel()
t.Logf("Registered a watch for Endpoint %q", test.resourceName)
// Wait for the discovery request to be sent out.
@ -1109,12 +1101,12 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) {
// Wait for an update from the xDS client and compare with expected
// update.
val, err = updateAndErrCh.Receive(ctx)
val, err = ew.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout when waiting for watch callback to invoked after response from management server: %v", err)
}
gotUpdate := val.(updateAndErr).update
gotErr := val.(updateAndErr).err
gotUpdate := val.(endpointsUpdateErrTuple).update
gotErr := val.(endpointsUpdateErrTuple).err
if (gotErr != nil) != (test.wantErr != "") {
t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr)
}

View File

@ -73,11 +73,3 @@ type EndpointsUpdate struct {
// Raw is the resource from the xds response.
Raw *anypb.Any
}
// EndpointsUpdateErrTuple is a tuple with the update and error. It contains the
// results from unmarshal functions. It's used to pass unmarshal results of
// multiple resources together, e.g. in maps like `map[string]{Update,error}`.
type EndpointsUpdateErrTuple struct {
Update EndpointsUpdate
Err error
}