xdsclient/test: deflake TestWatchResourceTimerCanRestartOnIgnoredADSRecvError (#6159)

This commit is contained in:
Arvind Bright 2023-04-06 13:29:59 -07:00 committed by GitHub
parent bfb57b8b49
commit 3489bb7d51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 15 deletions

View File

@ -31,6 +31,8 @@ type TestResourceWatcher struct {
UpdateCh chan *xdsresource.ResourceData
// ErrorCh is the channel on which errors from the xDS client are delivered.
ErrorCh chan error
// ResourceDoesNotExistCh is the channel used to indicate calls to OnResourceDoesNotExist
ResourceDoesNotExistCh chan struct{}
}
// OnUpdate is invoked by the xDS client to report an update on the resource
@ -52,7 +54,12 @@ func (w *TestResourceWatcher) OnError(err error) {
// OnResourceDoesNotExist is used by the xDS client to report that the resource
// being watched no longer exists.
func (w *TestResourceWatcher) OnResourceDoesNotExist() {}
func (w *TestResourceWatcher) OnResourceDoesNotExist() {
select {
case w.ResourceDoesNotExistCh <- struct{}{}:
default:
}
}
// NewTestResourceWatcher returns a TestResourceWatcher to watch for resources
// via the xDS client.

View File

@ -26,8 +26,10 @@ import (
"github.com/google/uuid"
"google.golang.org/grpc/internal/grpcsync"
util "google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
@ -176,21 +178,39 @@ func (s) TestTimerAndWatchStateOnErrorCallback(t *testing.T) {
// This tests the case where the ADS stream breaks after successfully receiving
// a message on the stream. The test performs the following:
// - configures the management server with resourceA.
// - configures the management server with the ability to dropRequests based on
// a boolean flag.
// - update the mgmt server with resourceA.
// - registers a watch for resourceA and verifies that the watcher's update
// callback is invoked.
// - registers a watch for resourceB and verifies that the watcher's update
// callback is not invoked. This is because the management server does not
// contain resourceB.
// - stops the management server to verify that the error propagated to the
// watcher is a connection error. This happens when the authority attempts
// to create a new stream.
// - force mgmt server to drop requests. Verify that watcher for resourceB gets
// connection error.
// - resume mgmt server to accept requests.
// - update the mgmt server with resourceB and verifies that the watcher's
// update callback is invoked.
func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Create a restartable listener which can close existing connections.
l, err := util.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
lis := util.NewRestartableListener(l)
defer lis.Close()
streamRestarted := grpcsync.NewEvent()
serverOpt := e2e.ManagementServerOptions{
Listener: lis,
OnStreamClosed: func(int64, *v3corepb.Node) {
streamRestarted.Fire()
},
}
// Using a shorter expiry timeout to verify that the watch timeout was never fired.
a, ms, nodeID := setupTest(ctx, t, emptyServerOpts, defaultTestWatchExpiryTimeout)
a, ms, nodeID := setupTest(ctx, t, serverOpt, defaultTestTimeout)
defer ms.Stop()
defer a.close()
nameA := "xdsclient-test-lds-resourceA"
@ -211,6 +231,9 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
case <-watcherA.UpdateCh:
}
cancelA()
lis.Stop()
nameB := "xdsclient-test-lds-resourceB"
watcherB := testutils.NewTestResourceWatcher()
cancelB := a.watchResource(listenerResourceType, nameB, watcherB)
@ -218,14 +241,15 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
// Blocking on resource B watcher's error channel. This error should be due to
// connectivity issue when reconnecting because the mgmt server was already been
// stopped. ALl other errors or an update will fail the test.
cancelA()
ms.Stop()
// stopped. Also verifying that OnResourceDoesNotExist() method was not invoked
// on the watcher.
select {
case <-ctx.Done():
t.Fatal("Test timed out before mgmt server got the request.")
case u := <-watcherB.UpdateCh:
t.Fatalf("Watch got an unexpected resource update: %v.", u)
case <-watcherB.ResourceDoesNotExistCh:
t.Fatalf("Illegal invocation of OnResourceDoesNotExist() method on the watcher.")
case gotErr := <-watcherB.ErrorCh:
wantErr := xdsresource.ErrorTypeConnection
if xdsresource.ErrType(gotErr) != wantErr {
@ -233,13 +257,20 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
}
}
// Since there was already a response on the stream, the timer for resource B
// should not fire. If the timer did fire, watch state would be in `watchStateTimeout`.
<-time.After(defaultTestWatchExpiryTimeout)
if err := compareWatchState(a, nameB, watchStateStarted); err != nil {
t.Fatalf("Invalid watch state: %v.", err)
// Updating server with resource B and also re-enabling requests on the server.
if err := updateResourceInServer(ctx, ms, nameB, nodeID); err != nil {
t.Fatalf("Failed to update server with resource: %q; err: %q", nameB, err)
}
lis.Restart()
for {
select {
case <-ctx.Done():
t.Fatal("Test timed out before watcher received the update.")
case <-watcherB.UpdateCh:
return
}
}
}
func compareWatchState(a *authority, rn string, wantState watchState) error {