xdsclient_test: Avoid restarting listener in TestServerFailureMetrics_AfterResponseRecv (#8399)

This commit is contained in:
Arjan Singh Bal 2025-06-17 10:02:42 +05:30 committed by GitHub
parent 9c62b1c9f1
commit e5de1e2cac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 108 additions and 50 deletions

View File

@ -709,7 +709,7 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) {
// does not receive an LDS response for the request that it sends. The test
// verifies that the watch callback is invoked with an error once the
// watchExpiryTimer fires.
func TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
nodeID := uuid.New().String()

View File

@ -20,6 +20,7 @@ package xdsclient_test
import (
"context"
"errors"
"net"
"testing"
@ -34,6 +35,7 @@ import (
"google.golang.org/grpc/xds/internal/clients/xdsclient/metrics"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
// TestResourceUpdateMetrics configures an xDS client, and a management server
@ -201,24 +203,49 @@ func (s) TestServerFailureMetrics_BeforeResponseRecv(t *testing.T) {
}
}
// TestServerFailureMetrics_AfterResponseRecv configures an xDS client, and a
// management server to send a valid LDS updates, and verifies that the
// resource update valid metric is emitted. It then closes the management server
// listener to close the ADS stream and verifies that the server failure metric
// is not emitted because the ADS stream was closed after having received a
// response on the stream.
// TestServerFailureMetrics_AfterResponseRecv configures an xDS client and a
// management server to send a valid LDS update, and verifies that the
// successful update metric is emitted. When the client ACKs the update, the
// server returns an error, breaking the stream. The test then verifies that the
// server failure metric is not emitted, because the ADS stream was closed after
// a response was received on the stream. Finally, the test waits for the client
// to establish a new stream and verifies that the client emits a metric after
// receiving a successful update.
func (s) TestServerFailureMetrics_AfterResponseRecv(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
tmr := newTestMetricsReporter()
l, err := net.Listen("tcp", "localhost:0")
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: lis})
streamCreationQuota := make(chan struct{}, 1)
streamCreationQuota <- struct{}{}
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
OnStreamOpen: func(context.Context, int64, string) error {
// The following select block is used to block stream creation after
// the first stream has failed, but while we are waiting to verify
// that the failure metric is not reported.
select {
case <-streamCreationQuota:
case <-ctx.Done():
}
return nil
},
OnStreamRequest: func(streamID int64, req *v3discoverypb.DiscoveryRequest) error {
// We only want the ACK on the first stream to return an error
// (leading to stream closure), without effecting subsequent stream
// attempts.
if streamID == 1 && req.GetVersionInfo() != "" {
return errors.New("test configured error")
}
return nil
}},
)
const listenerResourceName = "test-listener-resource"
const routeConfigurationName = "test-route-configuration-resource"
nodeID := uuid.New().String()
@ -260,25 +287,27 @@ func (s) TestServerFailureMetrics_AfterResponseRecv(t *testing.T) {
defer client.Close()
// Watch the valid listener configured on the management server. This should
// cause a resource updates valid count to emit eventually.
// cause a resource update valid metric to emit eventually.
client.WatchResource(listenerType.TypeURL, listenerResourceName, noopListenerWatcher{})
if err := tmr.waitForMetric(ctx, &metrics.ResourceUpdateValid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil {
t.Fatal(err.Error())
}
// Close the listener and ensure that the ADS stream breaks. This should
// cause a server failure metric to emit eventually.
lis.Stop()
if ctx.Err() != nil {
t.Fatalf("Timeout when waiting for ADS stream to close")
}
// Restart to prevent the attempt to create a new ADS stream after back off.
lis.Restart()
// Server failure should not have emitted.
// When the client sends an ACK, the management server would reply with an
// error, breaking the stream.
// Server failure should still have no recording point.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if err := tmr.waitForMetric(sCtx, &metrics.ServerFailure{ServerURI: mgmtServer.Address}); err == nil {
t.Fatal("tmr.WaitForInt64Count(ctx, mdWant) succeeded when expected to timeout.")
failureMetric := &metrics.ServerFailure{ServerURI: mgmtServer.Address}
if err := tmr.waitForMetric(sCtx, failureMetric); err == nil {
t.Fatalf("tmr.waitForMetric(%v) succeeded when expected to timeout.", failureMetric)
} else if sCtx.Err() == nil {
t.Fatalf("tmr.WaitForInt64Count(%v) = %v, want context deadline exceeded", failureMetric, err)
}
// Unblock stream creation and verify that an update is received
// successfully.
close(streamCreationQuota)
if err := tmr.waitForMetric(ctx, &metrics.ResourceUpdateValid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil {
t.Fatal(err.Error())
}
}

View File

@ -21,6 +21,7 @@ package xdsclient
import (
"context"
"encoding/json"
"errors"
"fmt"
"testing"
@ -32,6 +33,7 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
_ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter.
)
@ -236,12 +238,14 @@ func (s) TestServerFailureMetrics_BeforeResponseRecv(t *testing.T) {
}
}
// TestServerFailureMetrics_AfterResponseRecv configures an xDS client, and a
// management server to send a valid LDS updates, and verifies that the
// server failure metric is not emitted. It then closes the management server
// listener to close the ADS stream and verifies that the server failure metric
// is still not emitted because the the ADS stream was closed after having
// received a response on the stream.
// TestServerFailureMetrics_AfterResponseRecv configures an xDS client and a
// management server to send a valid LDS update, and verifies that the
// successful update metric is emitted. When the client ACKs the update, the
// server returns an error, breaking the stream. The test then verifies that the
// server failure metric is not emitted, because the ADS stream was closed after
// a response was received on the stream. Finally, the test waits for the client
// to establish a new stream and verifies that the client emits a metric after
// receiving a successful update.
func (s) TestServerFailureMetrics_AfterResponseRecv(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
@ -252,7 +256,31 @@ func (s) TestServerFailureMetrics_AfterResponseRecv(t *testing.T) {
t.Fatalf("net.Listen() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: lis})
streamCreationQuota := make(chan struct{}, 1)
streamCreationQuota <- struct{}{}
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
OnStreamOpen: func(context.Context, int64, string) error {
// The following select block is used to block stream creation after
// the first stream has failed, but while we are waiting to verify
// that the failure metric is not reported.
select {
case <-streamCreationQuota:
case <-ctx.Done():
}
return nil
},
OnStreamRequest: func(streamID int64, req *v3discoverypb.DiscoveryRequest) error {
// We only want the ACK on the first stream to return an error
// (leading to stream closure), without effecting subsequent stream
// attempts.
if streamID == 1 && req.GetVersionInfo() != "" {
return errors.New("test configured error")
}
return nil
}},
)
const listenerResourceName = "test-listener-resource"
const routeConfigurationName = "test-route-configuration-resource"
nodeID := uuid.New().String()
@ -284,49 +312,50 @@ func (s) TestServerFailureMetrics_AfterResponseRecv(t *testing.T) {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
pool := NewPool(config)
client, close, err := pool.NewClientForTesting(OptionsForTesting{
client, closePool, err := pool.NewClientForTesting(OptionsForTesting{
Name: t.Name(),
MetricsRecorder: tmr,
})
if err != nil {
t.Fatalf("Failed to create an xDS client: %v", err)
}
defer close()
defer closePool()
// Watch the valid listener configured on the management server. This should
// cause a resource updates valid count to emit eventually.
xdsresource.WatchListener(client, listenerResourceName, noopListenerWatcher{})
mdWant := stats.MetricsData{
mdSuccess := stats.MetricsData{
Handle: xdsClientResourceUpdatesValidMetric.Descriptor(),
IntIncr: 1,
LabelKeys: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"},
LabelVals: []string{"Test/ServerFailureMetrics_AfterResponseRecv", mgmtServer.Address, "ListenerResource"},
}
if err := tmr.WaitForInt64Count(ctx, mdWant); err != nil {
if err := tmr.WaitForInt64Count(ctx, mdSuccess); err != nil {
t.Fatal(err.Error())
}
// Server failure should have no recording point.
if got, _ := tmr.Metric("grpc.xds_client.server_failure"); got != 0 {
t.Fatalf("Unexpected data for metric \"grpc.xds_client.server_failure\", got: %v, want: %v", got, 0)
}
// Close the listener and ensure that the ADS stream breaks. This should
// cause a server failure count to emit eventually.
lis.Stop()
if ctx.Err() != nil {
t.Fatalf("Timeout when waiting for ADS stream to close")
}
// Restart to prevent the attempt to create a new ADS stream after back off.
lis.Restart()
mdWant = stats.MetricsData{
// When the client sends an ACK, the management server would reply with an
// error, breaking the stream.
mdFailure := stats.MetricsData{
Handle: xdsClientServerFailureMetric.Descriptor(),
IntIncr: 1,
LabelKeys: []string{"grpc.target", "grpc.xds.server"},
LabelVals: []string{"Test/ServerFailureMetrics_AfterResponseRecv", mgmtServer.Address},
}
// Server failure should still have no recording point.
if err := tmr.WaitForInt64Count(ctx, mdWant); err == nil {
t.Fatal("tmr.WaitForInt64Count(ctx, mdWant) succeeded when expected to timeout.")
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if err := tmr.WaitForInt64Count(sCtx, mdFailure); err == nil {
t.Fatalf("tmr.WaitForInt64Count(%v) succeeded when expected to timeout.", mdFailure)
} else if sCtx.Err() == nil {
t.Fatalf("tmr.WaitForInt64Count(%v) = %v, want context deadline exceeded", mdFailure, err)
}
// Unblock stream creation and verify that an update is received
// successfully.
close(streamCreationQuota)
if err := tmr.WaitForInt64Count(ctx, mdSuccess); err != nil {
t.Fatal(err.Error())
}
}