grpc-go/xds/server_ext_test.go

356 lines
13 KiB
Go

/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds_test
import (
"context"
"fmt"
"io"
"net"
"strconv"
"strings"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
"github.com/google/uuid"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
var errAcceptAndClose = status.New(codes.Unavailable, "")
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)
type testService struct {
testgrpc.UnimplementedTestServiceServer
}
func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}
func (*testService) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
}
func (*testService) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
if err == io.EOF {
return nil
}
}
}
func hostPortFromListener(lis net.Listener) (string, uint32, error) {
host, p, err := net.SplitHostPort(lis.Addr().String())
if err != nil {
return "", 0, fmt.Errorf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err)
}
port, err := strconv.ParseInt(p, 10, 32)
if err != nil {
return "", 0, fmt.Errorf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err)
}
return host, uint32(port), nil
}
// TestServingModeChanges tests the Server's logic as it transitions from Not
// Ready to Ready, then to Not Ready. Before it goes Ready, connections should
// be accepted and closed. After it goes ready, RPC's should proceed as normal
// according to matched route configuration. After it transitions back into not
// ready (through an explicit LDS Resource Not Found), previously running RPC's
// should be gracefully closed and still work, and new RPC's should fail.
func (s) TestServingModeChanges(t *testing.T) {
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch. Due to not having received the full
// configuration, this should cause the server to be in mode Serving.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
Routes: []*v3routepb.RouteConfiguration{routeConfig},
}
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("timeout waiting for the xDS Server to go Serving")
case <-serving.Done():
}
// A unary RPC should work once it transitions into serving. (need this same
// assertion from LDS resource not found triggering it).
waitForSuccessfulRPC(ctx, t, cc)
// Start a stream before switching the server to not serving. Due to the
// stream being created before the graceful stop of the underlying
// connection, it should be able to continue even after the server switches
// to not serving.
c := testgrpc.NewTestServiceClient(cc)
stream, err := c.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("cc.FullDuplexCall failed: %f", err)
}
// Lookup the xDS client in use based on the dedicated well-known key, as
// defined in A71, used by the xDS enabled gRPC server.
xdsC, close, err := xdsclient.GetForTesting(xdsclient.NameForServer)
if err != nil {
t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents))
}
defer close()
// Invoke LDS Resource not found here (tests graceful close).
triggerResourceNotFound := internal.TriggerXDSResourceNotFoundForTesting.(func(xdsclient.XDSClient, xdsresource.Type, string) error)
listenerResourceType := xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL].(xdsresource.Type)
if err := triggerResourceNotFound(xdsC, listenerResourceType, listener.GetName()); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}
// New RPCs on that connection should eventually start failing. Due to
// Graceful Stop any started streams continue to work.
if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
}
if err = stream.CloseSend(); err != nil {
t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
}
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}
// New RPCs on that connection should eventually start failing.
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
}
// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
// returns resource not found. Before getting the resource not found, the xDS
// Server has not received all configuration needed, so it should Accept and
// Close any new connections. After it has received the resource not found
// error, the server should move to serving, successfully Accept Connections,
// and fail at the L7 level with resource not found specified.
func (s) TestResourceNotFoundRDS(t *testing.T) {
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch, and no RDS resource corresponding to
// this route name.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
const routeConfigResourceName = "routeName"
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, routeConfigResourceName)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
// Lookup the xDS client in use based on the dedicated well-known key, as
// defined in A71, used by the xDS enabled gRPC server.
xdsC, close, err := xdsclient.GetForTesting(xdsclient.NameForServer)
if err != nil {
t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents))
}
defer close()
// Invoke resource not found - this should result in L7 RPC error with
// unavailable receive on serving as a result, should trigger it to go
// serving. Poll as watch might not be started yet to trigger resource not
// found.
triggerResourceNotFound := internal.TriggerXDSResourceNotFoundForTesting.(func(xdsclient.XDSClient, xdsresource.Type, string) error)
routeConfigResourceType := xdsinternal.ResourceTypeMapForTesting[version.V3RouteConfigURL].(xdsresource.Type)
loop:
for {
if err := triggerResourceNotFound(xdsC, routeConfigResourceType, routeConfigResourceName); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}
select {
case <-serving.Done():
break loop
case <-ctx.Done():
t.Fatalf("timed out waiting for serving mode to go serving")
case <-time.After(time.Millisecond):
}
}
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
}
func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
t.Helper()
c := testgrpc.NewTestServiceClient(cc)
if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}
// waitForFailedRPCWithStatus makes unary RPC's until it receives the expected
// status in a polling manner. Fails if the RPC made does not return the
// expected status before the context expires.
func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.ClientConn, st *status.Status) {
t.Helper()
c := testgrpc.NewTestServiceClient(cc)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
var err error
for {
select {
case <-ctx.Done():
t.Fatalf("failure when waiting for RPCs to fail with certain status %v: %v. most recent error received from RPC: %v", st, ctx.Err(), err)
case <-ticker.C:
_, err = c.EmptyCall(ctx, &testpb.Empty{})
if status.Code(err) == st.Code() && strings.Contains(err.Error(), st.Message()) {
t.Logf("most recent error happy case: %v", err.Error())
return
}
}
}
}