xds: suppress redundant resource updates using proto.Equal (#4831)

This commit is contained in:
Easwar Swaminathan 2021-10-05 16:55:25 -07:00 committed by GitHub
parent ee479e630f
commit 4bd9995351
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 414 additions and 99 deletions

View File

@ -24,6 +24,7 @@ package xds_test
import (
"context"
"fmt"
"net"
"testing"
"time"
@ -39,6 +40,118 @@ import (
"google.golang.org/grpc/xds/internal/testutils/e2e"
)
// TestServerSideXDS_RedundantUpdateSuppression tests the scenario where the
// control plane sends the same resource update. It verifies that the mode
// change callback is not invoked and client connections to the server are not
// recycled.
func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := setupManagementServer(t)
defer cleanup()
creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
if err != nil {
t.Fatal(err)
}
lis, err := xdstestutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
updateCh := make(chan connectivity.ServingMode, 1)
// Create a server option to get notified about serving mode changes.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
updateCh <- args.Mode
})
// Initialize an xDS-enabled gRPC server and register the stubServer on it.
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
defer server.Stop()
testpb.RegisterTestServiceServer(server, &testService{})
// Setup the management server to respond with the listener resources.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
// Wait for the listener to move to "serving" mode.
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh:
if mode != connectivity.ServingModeServing {
t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
// Create a ClientConn and make a successful RPCs.
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
waitForSuccessfulRPC(ctx, t, cc)
// Start a goroutine to make sure that we do not see any connectivity state
// changes on the client connection. If redundant updates are not
// suppressed, server will recycle client connections.
errCh := make(chan error, 1)
go func() {
if cc.WaitForStateChange(ctx, connectivity.Ready) {
errCh <- fmt.Errorf("unexpected connectivity state change {%s --> %s} on the client connection", connectivity.Ready, cc.GetState())
return
}
errCh <- nil
}()
// Update the management server with the same listener resource. This will
// update the resource version though, and should result in a the management
// server sending the same resource to the xDS-enabled gRPC server.
if err := managementServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
}); err != nil {
t.Fatal(err)
}
// Since redundant resource updates are suppressed, we should not see the
// mode change callback being invoked.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case mode := <-updateCh:
t.Fatalf("unexpected mode change callback with new mode %v", mode)
}
// Make sure RPCs continue to succeed.
waitForSuccessfulRPC(ctx, t, cc)
// Cancel the context to ensure that the WaitForStateChange call exits early
// and returns false.
cancel()
if err := <-errCh; err != nil {
t.Fatal(err)
}
}
// TestServerSideXDS_ServingModeChanges tests the serving mode functionality in
// xDS enabled gRPC servers. It verifies that appropriate mode changes happen in
// the server, and also verifies behavior of clientConns under these modes.
@ -163,17 +276,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
t.Error(err)
}
// Wait for lis2 to move to "not-serving" mode. lis1 also receives an update
// here even though it stays in "serving" mode.
// See https://github.com/grpc/grpc-go/issues/4695.
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
// Wait for lis2 to move to "not-serving" mode.
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)

View File

@ -18,7 +18,10 @@
package xdsclient
import "google.golang.org/grpc/internal/pretty"
import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/protobuf/proto"
)
type watcherInfoWithUpdate struct {
wi *watchInfo
@ -98,9 +101,13 @@ func (c *clientImpl) NewListeners(updates map[string]ListenerUpdateErrTuple, met
}
continue
}
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(uErr.Update)
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// from the one currently cached.
if cur, ok := c.ldsCache[name]; !ok || !proto.Equal(cur.Raw, uErr.Update.Raw) {
for wi := range s {
wi.newUpdate(uErr.Update)
}
}
// Sync cache.
c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
@ -164,9 +171,13 @@ func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdateErrTupl
}
continue
}
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(uErr.Update)
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// from the one currently cached.
if cur, ok := c.rdsCache[name]; !ok || !proto.Equal(cur.Raw, uErr.Update.Raw) {
for wi := range s {
wi.newUpdate(uErr.Update)
}
}
// Sync cache.
c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
@ -214,9 +225,13 @@ func (c *clientImpl) NewClusters(updates map[string]ClusterUpdateErrTuple, metad
}
continue
}
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(uErr.Update)
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// from the one currently cached.
if cur, ok := c.cdsCache[name]; !ok || !proto.Equal(cur.Raw, uErr.Update.Raw) {
for wi := range s {
wi.newUpdate(uErr.Update)
}
}
// Sync cache.
c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
@ -281,9 +296,13 @@ func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdateErrTuple, me
}
continue
}
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(uErr.Update)
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// from the one currently cached.
if cur, ok := c.edsCache[name]; !ok || !proto.Equal(cur.Raw, uErr.Update.Raw) {
for wi := range s {
wi.newUpdate(uErr.Update)
}
}
// Sync cache.
c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))

View File

@ -26,6 +26,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@ -181,7 +182,9 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) {
t.Fatal(err)
}
wantUpdate2 := ClusterUpdate{ClusterName: testEDSName + "2"}
// The second update needs to be different in the underlying resource proto
// for the watch callback to be invoked.
wantUpdate2 := ClusterUpdate{ClusterName: testEDSName + "2", Raw: &anypb.Any{}}
client.NewClusters(map[string]ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate2}}, UpdateMetadata{})
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2, nil); err != nil {
t.Fatal(err)
@ -200,7 +203,7 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want
}
return nil
}
if gotUpdate.Err != nil || !cmp.Equal(gotUpdate.Update, wantUpdate) {
if gotUpdate.Err != nil || !cmp.Equal(gotUpdate.Update, wantUpdate, protocmp.Transform()) {
return fmt.Errorf("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)", gotUpdate.Update, gotUpdate.Err, wantUpdate)
}
return nil
@ -218,7 +221,7 @@ func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, w
}
return nil
}
if gotUpdate.Err != nil || !cmp.Equal(gotUpdate.Update, wantUpdate) {
if gotUpdate.Err != nil || !cmp.Equal(gotUpdate.Update, wantUpdate, protocmp.Transform()) {
return fmt.Errorf("unexpected route config update: (%v, %v), want: (%v, nil)", gotUpdate.Update, gotUpdate.Err, wantUpdate)
}
return nil
@ -236,7 +239,7 @@ func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantU
}
return nil
}
if !cmp.Equal(gotUpdate.Update, wantUpdate) {
if !cmp.Equal(gotUpdate.Update, wantUpdate, protocmp.Transform()) {
return fmt.Errorf("unexpected clusterUpdate: (%v, %v), want: (%v, nil)", gotUpdate.Update, gotUpdate.Err, wantUpdate)
}
return nil
@ -254,7 +257,7 @@ func verifyEndpointsUpdate(ctx context.Context, updateCh *testutils.Channel, wan
}
return nil
}
if gotUpdate.Err != nil || !cmp.Equal(gotUpdate.Update, wantUpdate, cmpopts.EquateEmpty()) {
if gotUpdate.Err != nil || !cmp.Equal(gotUpdate.Update, wantUpdate, cmpopts.EquateEmpty(), protocmp.Transform()) {
return fmt.Errorf("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)", gotUpdate.Update, gotUpdate.Err, wantUpdate)
}
return nil

View File

@ -24,6 +24,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/grpc/internal/testutils"
)
@ -64,12 +65,16 @@ func (s) TestClusterWatch(t *testing.T) {
t.Fatal(err)
}
// Another update, with an extra resource for a different resource name.
// Push an update, with an extra resource for a different resource name.
// Specify a non-nil raw proto in the original resource to ensure that the
// new update is not considered equal to the old one.
newUpdate := wantUpdate
newUpdate.Raw = &anypb.Any{}
client.NewClusters(map[string]ClusterUpdateErrTuple{
testCDSName: {Update: wantUpdate},
testCDSName: {Update: newUpdate},
"randomName": {},
}, UpdateMetadata{})
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil {
if err := verifyClusterUpdate(ctx, clusterUpdateCh, newUpdate, nil); err != nil {
t.Fatal(err)
}
@ -130,19 +135,28 @@ func (s) TestClusterTwoWatchSameResourceName(t *testing.T) {
}
}
// Cancel the last watch, and send update again.
// Cancel the last watch, and send update again. None of the watchers should
// be notified because one has been cancelled, and the other is receiving
// the same update.
cancelLastWatch()
client.NewClusters(map[string]ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate}}, UpdateMetadata{})
for i := 0; i < count-1; i++ {
if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate, nil); err != nil {
t.Fatal(err)
}
for i := 0; i < count; i++ {
func() {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := clusterUpdateChs[i].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ClusterUpdate: %v, %v, want channel recv timeout", u, err)
}
}()
}
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := clusterUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
// Push a new update and make sure the uncancelled watcher is invoked.
// Specify a non-nil raw proto to ensure that the new update is not
// considered equal to the old one.
newUpdate := ClusterUpdate{ClusterName: testEDSName, Raw: &anypb.Any{}}
client.NewClusters(map[string]ClusterUpdateErrTuple{testCDSName: {Update: newUpdate}}, UpdateMetadata{})
if err := verifyClusterUpdate(ctx, clusterUpdateChs[0], newUpdate, nil); err != nil {
t.Fatal(err)
}
}
@ -417,22 +431,26 @@ func (s) TestClusterResourceRemoved(t *testing.T) {
t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err)
}
// Watcher 2 should get the same update again.
if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2, nil); err != nil {
t.Fatal(err)
// Watcher 2 should not see an update since the resource has not changed.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := clusterUpdateCh2.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ClusterUpdate: %v, want receiving from channel timeout", u)
}
// Send one more update without resource 1.
// Send another update with resource 2 modified. Specify a non-nil raw proto
// to ensure that the new update is not considered equal to the old one.
wantUpdate2 = ClusterUpdate{ClusterName: testEDSName + "2", Raw: &anypb.Any{}}
client.NewClusters(map[string]ClusterUpdateErrTuple{testCDSName + "2": {Update: wantUpdate2}}, UpdateMetadata{})
// Watcher 1 should not see an update.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := clusterUpdateCh1.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
t.Errorf("unexpected Cluster: %v, want receiving from channel timeout", u)
}
// Watcher 2 should get the same update again.
// Watcher 2 should get the update.
if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2, nil); err != nil {
t.Fatal(err)
}

View File

@ -24,6 +24,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/xds/internal"
@ -82,18 +83,23 @@ func (s) TestEndpointsWatch(t *testing.T) {
t.Fatal(err)
}
// Another update for a different resource name.
client.NewEndpoints(map[string]EndpointsUpdateErrTuple{"randomName": {}}, UpdateMetadata{})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := endpointsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
// Push an update, with an extra resource for a different resource name.
// Specify a non-nil raw proto in the original resource to ensure that the
// new update is not considered equal to the old one.
newUpdate := wantUpdate
newUpdate.Raw = &anypb.Any{}
client.NewEndpoints(map[string]EndpointsUpdateErrTuple{
testCDSName: {Update: newUpdate},
"randomName": {},
}, UpdateMetadata{})
if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh, newUpdate, nil); err != nil {
t.Fatal(err)
}
// Cancel watch, and send update again.
cancelWatch()
client.NewEndpoints(map[string]EndpointsUpdateErrTuple{testCDSName: {Update: wantUpdate}}, UpdateMetadata{})
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := endpointsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
@ -149,19 +155,28 @@ func (s) TestEndpointsTwoWatchSameResourceName(t *testing.T) {
}
}
// Cancel the last watch, and send update again.
// Cancel the last watch, and send update again. None of the watchers should
// be notified because one has been cancelled, and the other is receiving
// the same update.
cancelLastWatch()
client.NewEndpoints(map[string]EndpointsUpdateErrTuple{testCDSName: {Update: wantUpdate}}, UpdateMetadata{})
for i := 0; i < count-1; i++ {
if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate, nil); err != nil {
t.Fatal(err)
}
for i := 0; i < count; i++ {
func() {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := endpointsUpdateChs[i].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
}
}()
}
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := endpointsUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
// Push a new update and make sure the uncancelled watcher is invoked.
// Specify a non-nil raw proto to ensure that the new update is not
// considered equal to the old one.
newUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}, Raw: &anypb.Any{}}
client.NewEndpoints(map[string]EndpointsUpdateErrTuple{testCDSName: {Update: newUpdate}}, UpdateMetadata{})
if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[0], newUpdate, nil); err != nil {
t.Fatal(err)
}
}

View File

@ -23,7 +23,10 @@ import (
"fmt"
"testing"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/protobuf/types/known/anypb"
)
// TestLDSWatch covers the cases:
@ -62,12 +65,15 @@ func (s) TestLDSWatch(t *testing.T) {
t.Fatal(err)
}
// Another update, with an extra resource for a different resource name.
// Push an update, with an extra resource for a different resource name.
// Specify a non-nil raw proto in the original resource to ensure that the
// new update is not considered equal to the old one.
newUpdate := ListenerUpdate{RouteConfigName: testRDSName, Raw: &anypb.Any{}}
client.NewListeners(map[string]ListenerUpdateErrTuple{
testLDSName: {Update: wantUpdate},
testLDSName: {Update: newUpdate},
"randomName": {},
}, UpdateMetadata{})
if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate, nil); err != nil {
if err := verifyListenerUpdate(ctx, ldsUpdateCh, newUpdate, nil); err != nil {
t.Fatal(err)
}
@ -77,7 +83,7 @@ func (s) TestLDSWatch(t *testing.T) {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := ldsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
t.Fatalf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@ -131,19 +137,28 @@ func (s) TestLDSTwoWatchSameResourceName(t *testing.T) {
}
}
// Cancel the last watch, and send update again.
// Cancel the last watch, and send update again. None of the watchers should
// be notified because one has been cancelled, and the other is receiving
// the same update.
cancelLastWatch()
client.NewListeners(map[string]ListenerUpdateErrTuple{testLDSName: {Update: wantUpdate}}, UpdateMetadata{})
for i := 0; i < count-1; i++ {
if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate, nil); err != nil {
t.Fatal(err)
}
for i := 0; i < count; i++ {
func() {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := ldsUpdateChs[i].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
}
}()
}
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := ldsUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
// Push a new update and make sure the uncancelled watcher is invoked.
// Specify a non-nil raw proto to ensure that the new update is not
// considered equal to the old one.
newUpdate := ListenerUpdate{RouteConfigName: testRDSName, Raw: &anypb.Any{}}
client.NewListeners(map[string]ListenerUpdateErrTuple{testLDSName: {Update: newUpdate}}, UpdateMetadata{})
if err := verifyListenerUpdate(ctx, ldsUpdateChs[0], newUpdate, nil); err != nil {
t.Fatal(err)
}
}
@ -332,22 +347,26 @@ func (s) TestLDSResourceRemoved(t *testing.T) {
t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err)
}
// Watcher 2 should get the same update again.
if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2, nil); err != nil {
t.Fatal(err)
// Watcher 2 should not see an update since the resource has not changed.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := ldsUpdateCh2.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ListenerUpdate: %v, want receiving from channel timeout", u)
}
// Send one more update without resource 1.
// Send another update with resource 2 modified. Specify a non-nil raw proto
// to ensure that the new update is not considered equal to the old one.
wantUpdate2 = ListenerUpdate{RouteConfigName: testEDSName + "2", Raw: &anypb.Any{}}
client.NewListeners(map[string]ListenerUpdateErrTuple{testLDSName + "2": {Update: wantUpdate2}}, UpdateMetadata{})
// Watcher 1 should not see an update.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := ldsUpdateCh1.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ListenerUpdate: %v, want receiving from channel timeout", u)
}
// Watcher 2 should get the same update again.
// Watcher 2 should get the update.
if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2, nil); err != nil {
t.Fatal(err)
}
@ -448,3 +467,125 @@ func (s) TestListenerWatchPartialValid(t *testing.T) {
t.Fatal(err)
}
}
// TestListenerWatch_RedundantUpdateSupression tests scenarios where an update
// with an unmodified resource is suppressed, and modified resource is not.
func (s) TestListenerWatch_RedundantUpdateSupression(t *testing.T) {
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
c, err := apiClientCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for API client to be created: %v", err)
}
apiClient := c.(*testAPIClient)
ldsUpdateCh := testutils.NewChannel()
client.WatchListener(testLDSName, func(update ListenerUpdate, err error) {
ldsUpdateCh.Send(ListenerUpdateErrTuple{Update: update, Err: err})
})
if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
basicListener := testutils.MarshalAny(&v3listenerpb.Listener{
Name: testLDSName,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: testutils.MarshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{RouteConfigName: "route-config-name"},
},
}),
},
})
listenerWithFilter1 := testutils.MarshalAny(&v3listenerpb.Listener{
Name: testLDSName,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: testutils.MarshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{RouteConfigName: "route-config-name"},
},
HttpFilters: []*v3httppb.HttpFilter{
{
Name: "customFilter1",
ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: customFilterConfig},
},
},
}),
},
})
listenerWithFilter2 := testutils.MarshalAny(&v3listenerpb.Listener{
Name: testLDSName,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: testutils.MarshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{RouteConfigName: "route-config-name"},
},
HttpFilters: []*v3httppb.HttpFilter{
{
Name: "customFilter2",
ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: customFilterConfig},
},
},
}),
},
})
tests := []struct {
update ListenerUpdate
wantCallback bool
}{
{
// First update. Callback should be invoked.
update: ListenerUpdate{Raw: basicListener},
wantCallback: true,
},
{
// Same update as previous. Callback should be skipped.
update: ListenerUpdate{Raw: basicListener},
wantCallback: false,
},
{
// New update. Callback should be invoked.
update: ListenerUpdate{Raw: listenerWithFilter1},
wantCallback: true,
},
{
// Same update as previous. Callback should be skipped.
update: ListenerUpdate{Raw: listenerWithFilter1},
wantCallback: false,
},
{
// New update. Callback should be invoked.
update: ListenerUpdate{Raw: listenerWithFilter2},
wantCallback: true,
},
{
// Same update as previous. Callback should be skipped.
update: ListenerUpdate{Raw: listenerWithFilter2},
wantCallback: false,
},
}
for _, test := range tests {
client.NewListeners(map[string]ListenerUpdateErrTuple{testLDSName: {Update: test.update}}, UpdateMetadata{})
if test.wantCallback {
if err := verifyListenerUpdate(ctx, ldsUpdateCh, test.update, nil); err != nil {
t.Fatal(err)
}
} else {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := ldsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ListenerUpdate: %v, want receiving from channel timeout", u)
}
}
}
}

View File

@ -24,6 +24,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/grpc/internal/testutils"
)
@ -71,18 +72,23 @@ func (s) TestRDSWatch(t *testing.T) {
t.Fatal(err)
}
// Another update for a different resource name.
client.NewRouteConfigs(map[string]RouteConfigUpdateErrTuple{"randomName": {}}, UpdateMetadata{})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := rdsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
// Push an update, with an extra resource for a different resource name.
// Specify a non-nil raw proto in the original resource to ensure that the
// new update is not considered equal to the old one.
newUpdate := wantUpdate
newUpdate.Raw = &anypb.Any{}
client.NewRouteConfigs(map[string]RouteConfigUpdateErrTuple{
testRDSName: {Update: newUpdate},
"randomName": {},
}, UpdateMetadata{})
if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, newUpdate, nil); err != nil {
t.Fatal(err)
}
// Cancel watch, and send update again.
cancelWatch()
client.NewRouteConfigs(map[string]RouteConfigUpdateErrTuple{testRDSName: {Update: wantUpdate}}, UpdateMetadata{})
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := rdsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
@ -145,19 +151,29 @@ func (s) TestRDSTwoWatchSameResourceName(t *testing.T) {
}
}
// Cancel the last watch, and send update again.
// Cancel the last watch, and send update again. None of the watchers should
// be notified because one has been cancelled, and the other is receiving
// the same update.
cancelLastWatch()
client.NewRouteConfigs(map[string]RouteConfigUpdateErrTuple{testRDSName: {Update: wantUpdate}}, UpdateMetadata{})
for i := 0; i < count-1; i++ {
if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate, nil); err != nil {
t.Fatal(err)
}
for i := 0; i < count; i++ {
func() {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := rdsUpdateChs[i].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
}
}()
}
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if u, err := rdsUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
// Push a new update and make sure the uncancelled watcher is invoked.
// Specify a non-nil raw proto to ensure that the new update is not
// considered equal to the old one.
newUpdate := wantUpdate
newUpdate.Raw = &anypb.Any{}
client.NewRouteConfigs(map[string]RouteConfigUpdateErrTuple{testRDSName: {Update: newUpdate}}, UpdateMetadata{})
if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[0], newUpdate, nil); err != nil {
t.Fatal(err)
}
}