mirror of https://github.com/grpc/grpc-go.git
test/xds: wait for all ACKs before forcing stream restart (#5500)
This commit is contained in:
parent
a094a1095c
commit
4f47c8c163
|
@ -39,14 +39,15 @@ import (
|
|||
// xDS flow on the client.
|
||||
const wantResources = 4
|
||||
|
||||
// seenAllACKs returns true if we have seen two streams with acks for all the
|
||||
// resources that we are interested in.
|
||||
func seenAllACKs(acks map[int64]map[string]string) bool {
|
||||
if len(acks) != 2 {
|
||||
// seenAllACKs returns true if the provided ackVersions map contains valid acks
|
||||
// for all the resources that we are interested in. If `wantNonEmpty` is true,
|
||||
// only non-empty ack versions are considered valid.
|
||||
func seenAllACKs(acksVersions map[string]string, wantNonEmpty bool) bool {
|
||||
if len(acksVersions) != wantResources {
|
||||
return false
|
||||
}
|
||||
for _, v := range acks {
|
||||
if len(v) != wantResources {
|
||||
for _, ack := range acksVersions {
|
||||
if wantNonEmpty && ack == "" {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -65,8 +66,19 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
|
|||
}
|
||||
lis := testutils.NewRestartableListener(l)
|
||||
|
||||
streamClosed := grpcsync.NewEvent() // Event to notify stream closure.
|
||||
acksReceived := grpcsync.NewEvent() // Event to notify receipt of acks for all resources.
|
||||
// We depend on the fact that the management server assigns monotonically
|
||||
// increasing stream IDs starting at 1.
|
||||
const (
|
||||
idBeforeRestart = 1
|
||||
idAfterRestart = 2
|
||||
)
|
||||
|
||||
// Events of importance in the test, in the order in which they are expected
|
||||
// to happen.
|
||||
acksReceivedBeforeRestart := grpcsync.NewEvent()
|
||||
streamRestarted := grpcsync.NewEvent()
|
||||
acksReceivedAfterRestart := grpcsync.NewEvent()
|
||||
|
||||
// Map from stream id to a map of resource type to resource version.
|
||||
ackVersionsMap := make(map[int64]map[string]string)
|
||||
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{
|
||||
|
@ -78,7 +90,7 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
|
|||
// - Request contains no resource names. Such requests are usually
|
||||
// seen when the xdsclient is shutting down and is no longer
|
||||
// interested in the resources that it had subscribed to earlier.
|
||||
if acksReceived.HasFired() || len(req.GetResourceNames()) == 0 {
|
||||
if acksReceivedAfterRestart.HasFired() || len(req.GetResourceNames()) == 0 {
|
||||
return nil
|
||||
}
|
||||
// Create a stream specific map to store ack versions if this is the
|
||||
|
@ -87,13 +99,25 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
|
|||
ackVersionsMap[id] = make(map[string]string)
|
||||
}
|
||||
ackVersionsMap[id][req.GetTypeUrl()] = req.GetVersionInfo()
|
||||
if seenAllACKs(ackVersionsMap) {
|
||||
acksReceived.Fire()
|
||||
// Prior to stream restart, we are interested only in non-empty
|
||||
// resource versions. The xdsclient first sends out requests with an
|
||||
// empty version string. After receipt of requested resource, it
|
||||
// sends out another request for the same resource, but this time
|
||||
// with a non-empty version string, to serve as an ACK.
|
||||
if seenAllACKs(ackVersionsMap[idBeforeRestart], true) {
|
||||
acksReceivedBeforeRestart.Fire()
|
||||
}
|
||||
// After stream restart, we expect the xdsclient to send out
|
||||
// requests with version string set to the previously ACKed
|
||||
// versions. If it sends out requests with empty version string, it
|
||||
// is a bug and we want this test to catch it.
|
||||
if seenAllACKs(ackVersionsMap[idAfterRestart], false) {
|
||||
acksReceivedAfterRestart.Fire()
|
||||
}
|
||||
return nil
|
||||
},
|
||||
OnStreamClosed: func(int64) {
|
||||
streamClosed.Fire()
|
||||
streamRestarted.Fire()
|
||||
},
|
||||
})
|
||||
defer cleanup1()
|
||||
|
@ -127,30 +151,34 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
|
|||
t.Fatalf("rpc EmptyCall() failed: %v", err)
|
||||
}
|
||||
|
||||
// A successful RPC means that we have captured the ack versions for all
|
||||
// resources in the OnStreamRequest callback. Nothing more needs to be done
|
||||
// here before stream restart.
|
||||
// A successful RPC means that the xdsclient received all requested
|
||||
// resources. The ACKs from the xdsclient may get a little delayed. So, we
|
||||
// need to wait for all ACKs to be received on the management server before
|
||||
// restarting the stream.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("Timeout when waiting for all resources to be ACKed prior to stream restart")
|
||||
case <-acksReceivedBeforeRestart.Done():
|
||||
}
|
||||
|
||||
// Stop the listener on the management server. This will cause the client to
|
||||
// backoff and recreate the stream.
|
||||
lis.Stop()
|
||||
|
||||
// Wait for the stream to be closed on the server.
|
||||
<-streamClosed.Done()
|
||||
<-streamRestarted.Done()
|
||||
|
||||
// Restart the listener on the management server to be able to accept
|
||||
// reconnect attempts from the client.
|
||||
lis.Restart()
|
||||
|
||||
// Wait for all the previously sent resources to be re-requested.
|
||||
<-acksReceived.Done()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("Timeout when waiting for all resources to be ACKed post stream restart")
|
||||
case <-acksReceivedAfterRestart.Done():
|
||||
}
|
||||
|
||||
// We depend on the fact that the management server assigns monotonically
|
||||
// increasing stream IDs starting at 1.
|
||||
const (
|
||||
idBeforeRestart = 1
|
||||
idAfterRestart = 2
|
||||
)
|
||||
if diff := cmp.Diff(ackVersionsMap[idBeforeRestart], ackVersionsMap[idAfterRestart]); diff != "" {
|
||||
t.Fatalf("unexpected diff in ack versions before and after stream restart (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue