diff --git a/balancer/weightedtarget/weightedaggregator/aggregator.go b/balancer/weightedtarget/weightedaggregator/aggregator.go index bcc8aca8b..fc929a2bf 100644 --- a/balancer/weightedtarget/weightedaggregator/aggregator.go +++ b/balancer/weightedtarget/weightedaggregator/aggregator.go @@ -26,6 +26,7 @@ package weightedaggregator import ( + "errors" "fmt" "sync" @@ -251,6 +252,14 @@ func (wbsa *Aggregator) buildAndUpdateLocked() { func (wbsa *Aggregator) build() balancer.State { wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState) + if len(wbsa.idToPickerState) == 0 { + // This is the case when all sub-balancers are removed. + return balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: base.NewErrPicker(errors.New("weighted-target: no targets to pick from")), + } + } + // Make sure picker's return error is consistent with the aggregatedState. pickers := make([]weightedPickerState, 0, len(wbsa.idToPickerState)) diff --git a/balancer/weightedtarget/weightedtarget_test.go b/balancer/weightedtarget/weightedtarget_test.go index acaae430f..c9c78fc5a 100644 --- a/balancer/weightedtarget/weightedtarget_test.go +++ b/balancer/weightedtarget/weightedtarget_test.go @@ -28,16 +28,23 @@ import ( "time" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/hierarchy" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/status" + + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" ) const ( @@ -163,6 +170,39 @@ func init() { NewRandomWRR = testutils.NewTestWRR } +// Tests the behavior of the weighted_target LB policy when there are no targets +// configured. It verifies that the LB policy sets the overall channel state to +// TRANSIENT_FAILURE and fails RPCs with an expected status code and message. +func (s) TestWeightedTarget_NoTargets(t *testing.T) { + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"weighted_target_experimental":{}}]}`), + } + cc, err := grpc.NewClient("passthrough:///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.NewClient() failed: %v", err) + } + defer cc.Close() + cc.Connect() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + client := testgrpc.NewTestServiceClient(cc) + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Error("EmptyCall() succeeded, want failure") + } + if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode { + t.Errorf("EmptyCall() failed with code = %v, want %s", gotCode, wantCode) + } + if gotMsg, wantMsg := err.Error(), "no targets to pick from"; !strings.Contains(gotMsg, wantMsg) { + t.Errorf("EmptyCall() failed with message = %q, want to contain %q", gotMsg, wantMsg) + } + if gotState, wantState := cc.GetState(), connectivity.TransientFailure; gotState != wantState { + t.Errorf("cc.GetState() = %v, want %v", gotState, wantState) + } +} + // TestWeightedTarget covers the cases that a sub-balancer is added and a // sub-balancer is removed. It verifies that the addresses and balancer configs // are forwarded to the right sub-balancer. This test is intended to test the @@ -326,6 +366,7 @@ func (s) TestWeightedTarget(t *testing.T) { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3) } } + // Update the Weighted Target Balancer with an empty address list and no // targets. This should cause a Transient Failure State update to the Client // Conn. @@ -344,6 +385,11 @@ func (s) TestWeightedTarget(t *testing.T) { if state != connectivity.TransientFailure { t.Fatalf("Empty target update should have triggered a TF state update, got: %v", state) } + p = <-cc.NewPickerCh + const wantErr = "no targets to pick from" + if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), wantErr) { + t.Fatalf("Pick() returned error: %v, want: %v", err, wantErr) + } } // TestWeightedTarget_OneSubBalancer_AddRemoveBackend tests the case where we diff --git a/internal/internal.go b/internal/internal.go index b3ee00f58..13e1f386b 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -168,6 +168,20 @@ var ( // other features, including the CSDS service. NewXDSResolverWithPoolForTesting any // func(*xdsclient.Pool) (resolver.Builder, error) + // NewXDSResolverWithClientForTesting creates a new xDS resolver builder + // using the provided xDS client instead of creating a new one using the + // bootstrap configuration specified by the supported environment variables. + // The resolver.Builder is meant to be used in conjunction with the + // grpc.WithResolvers DialOption. The resolver.Builder does not take + // ownership of the provided xDS client and it is the responsibility of the + // caller to close the client when no longer required. + // + // Testing Only + // + // This function should ONLY be used for testing and may not work with some + // other features, including the CSDS service. + NewXDSResolverWithClientForTesting any // func(xdsclient.XDSClient) (resolver.Builder, error) + // RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster // Specifier Plugin for testing purposes, regardless of the XDSRLS environment // variable. diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 5029a3386..dfe3fcc4e 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -79,10 +79,24 @@ func newBuilderWithPoolForTesting(pool *xdsclient.Pool) (resolver.Builder, error }, nil } +// newBuilderWithClientForTesting creates a new xds resolver builder using the +// specific xDS client, so that tests have complete control over the exact +// specific xDS client being used. +func newBuilderWithClientForTesting(client xdsclient.XDSClient) (resolver.Builder, error) { + return &xdsResolverBuilder{ + newXDSClient: func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) { + // Returning an empty close func here means that the responsibility + // of closing the client lies with the caller. + return client, func() {}, nil + }, + }, nil +} + func init() { resolver.Register(&xdsResolverBuilder{}) internal.NewXDSResolverWithConfigForTesting = newBuilderWithConfigForTesting internal.NewXDSResolverWithPoolForTesting = newBuilderWithPoolForTesting + internal.NewXDSResolverWithClientForTesting = newBuilderWithClientForTesting rinternal.NewWRR = wrr.NewRandom rinternal.NewXDSClient = xdsclient.DefaultPool.NewClient diff --git a/xds/test/eds_resource_missing_test.go b/xds/test/eds_resource_missing_test.go new file mode 100644 index 000000000..ce334b2cc --- /dev/null +++ b/xds/test/eds_resource_missing_test.go @@ -0,0 +1,188 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" + "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds/internal/xdsclient" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + + _ "google.golang.org/grpc/xds" // To register the xDS resolver and LB policies. +) + +const ( + defaultTestWatchExpiryTimeout = 500 * time.Millisecond + defaultTestTimeout = 5 * time.Second +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// Test verifies the xDS-enabled gRPC channel's behavior when the management +// server fails to send an EDS resource referenced by a Cluster resource. The +// expected outcome is an RPC failure with a status code Unavailable and a +// message indicating the absence of available targets. +func (s) TestEDS_MissingResource(t *testing.T) { + // Start an xDS management server. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + + // Create bootstrap configuration pointing to the above management server. + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + config, err := bootstrap.NewConfigFromContents(bc) + if err != nil { + t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err) + } + + // Create an xDS client with a short resource expiry timer. + pool := xdsclient.NewPool(config) + xdsC, xdsClose, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer xdsClose() + + // Create an xDS resolver for the test that uses the above xDS client. + resolverBuilder := internal.NewXDSResolverWithClientForTesting.(func(xdsclient.XDSClient) (resolver.Builder, error)) + xdsResolver, err := resolverBuilder(xdsC) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + // Create resources on the management server. No EDS resource is configured. + const serviceName = "my-service-client-side-xds" + const routeConfigName = "route-" + serviceName + const clusterName = "cluster-" + serviceName + const endpointsName = "endpoints-" + serviceName + resources := e2e.UpdateOptions{ + NodeID: nodeID, + SkipValidation: true, // Cluster resource refers to an EDS resource that is not configured. + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, e2e.SecurityLevelNone)}, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn with the xds:/// scheme. + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver)) + if err != nil { + t.Fatalf("Failed to create a grpc channel: %v", err) + } + defer cc.Close() + + // Make an RPC and verify that it fails with the expected error. + client := testgrpc.NewTestServiceClient(cc) + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Fatal("EmptyCall() succeeded, want failure") + } + if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode { + t.Errorf("EmptyCall() failed with code = %v, want %s", gotCode, wantCode) + } + if gotMsg, wantMsg := err.Error(), "no targets to pick from"; !strings.Contains(gotMsg, wantMsg) { + t.Errorf("EmptyCall() failed with message = %q, want to contain %q", gotMsg, wantMsg) + } +} + +// Test verifies the xDS-enabled gRPC channel's behavior when the management +// server sends an EDS resource with no endpoints. The expected outcome is an +// RPC failure with a status code Unavailable and a message indicating the +// absence of available targets. +func (s) TestEDS_NoEndpointsInResource(t *testing.T) { + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) + + // Create resources on the management server, with the EDS resource + // containing no endpoints. + const serviceName = "my-service-client-side-xds" + const routeConfigName = "route-" + serviceName + const clusterName = "cluster-" + serviceName + const endpointsName = "endpoints-" + serviceName + resources := e2e.UpdateOptions{ + NodeID: nodeID, + SkipValidation: true, // Cluster resource refers to an EDS resource that is not configured. + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{ + e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: "endpoints-" + serviceName, + Host: "localhost", + }), + }, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn with the xds:/// scheme. + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver)) + if err != nil { + t.Fatalf("Failed to create a grpc channel: %v", err) + } + defer cc.Close() + + // Make an RPC and verify that it fails with the expected error. + client := testgrpc.NewTestServiceClient(cc) + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Fatal("EmptyCall() succeeded, want failure") + } + if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode { + t.Errorf("EmptyCall() failed with code = %v, want %s", gotCode, wantCode) + } + if gotMsg, wantMsg := err.Error(), "no targets to pick from"; !strings.Contains(gotMsg, wantMsg) { + t.Errorf("EmptyCall() failed with message = %q, want to contain %q", gotMsg, wantMsg) + } +}