diff --git a/test/xds/xds_client_ack_nack_test.go b/test/xds/xds_client_ack_nack_test.go index ca0ec56e2..ca954f8a3 100644 --- a/test/xds/xds_client_ack_nack_test.go +++ b/test/xds/xds_client_ack_nack_test.go @@ -39,14 +39,15 @@ import ( // xDS flow on the client. const wantResources = 4 -// seenAllACKs returns true if we have seen two streams with acks for all the -// resources that we are interested in. -func seenAllACKs(acks map[int64]map[string]string) bool { - if len(acks) != 2 { +// seenAllACKs returns true if the provided ackVersions map contains valid acks +// for all the resources that we are interested in. If `wantNonEmpty` is true, +// only non-empty ack versions are considered valid. +func seenAllACKs(acksVersions map[string]string, wantNonEmpty bool) bool { + if len(acksVersions) != wantResources { return false } - for _, v := range acks { - if len(v) != wantResources { + for _, ack := range acksVersions { + if wantNonEmpty && ack == "" { return false } } @@ -65,8 +66,19 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) { } lis := testutils.NewRestartableListener(l) - streamClosed := grpcsync.NewEvent() // Event to notify stream closure. - acksReceived := grpcsync.NewEvent() // Event to notify receipt of acks for all resources. + // We depend on the fact that the management server assigns monotonically + // increasing stream IDs starting at 1. + const ( + idBeforeRestart = 1 + idAfterRestart = 2 + ) + + // Events of importance in the test, in the order in which they are expected + // to happen. + acksReceivedBeforeRestart := grpcsync.NewEvent() + streamRestarted := grpcsync.NewEvent() + acksReceivedAfterRestart := grpcsync.NewEvent() + // Map from stream id to a map of resource type to resource version. ackVersionsMap := make(map[int64]map[string]string) managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{ @@ -78,7 +90,7 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) { // - Request contains no resource names. Such requests are usually // seen when the xdsclient is shutting down and is no longer // interested in the resources that it had subscribed to earlier. - if acksReceived.HasFired() || len(req.GetResourceNames()) == 0 { + if acksReceivedAfterRestart.HasFired() || len(req.GetResourceNames()) == 0 { return nil } // Create a stream specific map to store ack versions if this is the @@ -87,13 +99,25 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) { ackVersionsMap[id] = make(map[string]string) } ackVersionsMap[id][req.GetTypeUrl()] = req.GetVersionInfo() - if seenAllACKs(ackVersionsMap) { - acksReceived.Fire() + // Prior to stream restart, we are interested only in non-empty + // resource versions. The xdsclient first sends out requests with an + // empty version string. After receipt of requested resource, it + // sends out another request for the same resource, but this time + // with a non-empty version string, to serve as an ACK. + if seenAllACKs(ackVersionsMap[idBeforeRestart], true) { + acksReceivedBeforeRestart.Fire() + } + // After stream restart, we expect the xdsclient to send out + // requests with version string set to the previously ACKed + // versions. If it sends out requests with empty version string, it + // is a bug and we want this test to catch it. + if seenAllACKs(ackVersionsMap[idAfterRestart], false) { + acksReceivedAfterRestart.Fire() } return nil }, OnStreamClosed: func(int64) { - streamClosed.Fire() + streamRestarted.Fire() }, }) defer cleanup1() @@ -127,30 +151,34 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) { t.Fatalf("rpc EmptyCall() failed: %v", err) } - // A successful RPC means that we have captured the ack versions for all - // resources in the OnStreamRequest callback. Nothing more needs to be done - // here before stream restart. + // A successful RPC means that the xdsclient received all requested + // resources. The ACKs from the xdsclient may get a little delayed. So, we + // need to wait for all ACKs to be received on the management server before + // restarting the stream. + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for all resources to be ACKed prior to stream restart") + case <-acksReceivedBeforeRestart.Done(): + } // Stop the listener on the management server. This will cause the client to // backoff and recreate the stream. lis.Stop() // Wait for the stream to be closed on the server. - <-streamClosed.Done() + <-streamRestarted.Done() // Restart the listener on the management server to be able to accept // reconnect attempts from the client. lis.Restart() // Wait for all the previously sent resources to be re-requested. - <-acksReceived.Done() + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for all resources to be ACKed post stream restart") + case <-acksReceivedAfterRestart.Done(): + } - // We depend on the fact that the management server assigns monotonically - // increasing stream IDs starting at 1. - const ( - idBeforeRestart = 1 - idAfterRestart = 2 - ) if diff := cmp.Diff(ackVersionsMap[idBeforeRestart], ackVersionsMap[idAfterRestart]); diff != "" { t.Fatalf("unexpected diff in ack versions before and after stream restart (-want, +got):\n%s", diff) }