mirror of https://github.com/grpc/grpc-go.git
356 lines
13 KiB
Go
356 lines
13 KiB
Go
/*
|
|
*
|
|
* Copyright 2024 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package xds_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/internal"
|
|
"google.golang.org/grpc/internal/grpcsync"
|
|
"google.golang.org/grpc/internal/grpctest"
|
|
"google.golang.org/grpc/internal/testutils"
|
|
"google.golang.org/grpc/internal/testutils/xds/e2e"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/grpc/xds"
|
|
xdsinternal "google.golang.org/grpc/xds/internal"
|
|
"google.golang.org/grpc/xds/internal/xdsclient"
|
|
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
|
|
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
|
|
|
|
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
|
|
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
|
|
"github.com/google/uuid"
|
|
testgrpc "google.golang.org/grpc/interop/grpc_testing"
|
|
testpb "google.golang.org/grpc/interop/grpc_testing"
|
|
)
|
|
|
|
var errAcceptAndClose = status.New(codes.Unavailable, "")
|
|
|
|
type s struct {
|
|
grpctest.Tester
|
|
}
|
|
|
|
func Test(t *testing.T) {
|
|
grpctest.RunSubTests(t, s{})
|
|
}
|
|
|
|
const (
|
|
defaultTestTimeout = 10 * time.Second
|
|
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
|
|
)
|
|
|
|
type testService struct {
|
|
testgrpc.UnimplementedTestServiceServer
|
|
}
|
|
|
|
func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
|
|
return &testpb.Empty{}, nil
|
|
}
|
|
|
|
func (*testService) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
|
return &testpb.SimpleResponse{}, nil
|
|
}
|
|
|
|
func (*testService) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
|
|
for {
|
|
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func hostPortFromListener(lis net.Listener) (string, uint32, error) {
|
|
host, p, err := net.SplitHostPort(lis.Addr().String())
|
|
if err != nil {
|
|
return "", 0, fmt.Errorf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err)
|
|
}
|
|
port, err := strconv.ParseInt(p, 10, 32)
|
|
if err != nil {
|
|
return "", 0, fmt.Errorf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err)
|
|
}
|
|
return host, uint32(port), nil
|
|
}
|
|
|
|
// TestServingModeChanges tests the Server's logic as it transitions from Not
|
|
// Ready to Ready, then to Not Ready. Before it goes Ready, connections should
|
|
// be accepted and closed. After it goes ready, RPC's should proceed as normal
|
|
// according to matched route configuration. After it transitions back into not
|
|
// ready (through an explicit LDS Resource Not Found), previously running RPC's
|
|
// should be gracefully closed and still work, and new RPC's should fail.
|
|
func (s) TestServingModeChanges(t *testing.T) {
|
|
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
|
|
|
|
// Create bootstrap configuration pointing to the above management server.
|
|
nodeID := uuid.New().String()
|
|
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
|
|
|
|
lis, err := testutils.LocalTCPListener()
|
|
if err != nil {
|
|
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
|
|
}
|
|
// Setup the management server to respond with a listener resource that
|
|
// specifies a route name to watch. Due to not having received the full
|
|
// configuration, this should cause the server to be in mode Serving.
|
|
host, port, err := hostPortFromListener(lis)
|
|
if err != nil {
|
|
t.Fatalf("failed to retrieve host and port of server: %v", err)
|
|
}
|
|
|
|
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
|
|
resources := e2e.UpdateOptions{
|
|
NodeID: nodeID,
|
|
Listeners: []*v3listenerpb.Listener{listener},
|
|
SkipValidation: true,
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
if err := managementServer.Update(ctx, resources); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
serving := grpcsync.NewEvent()
|
|
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
|
|
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
|
|
if args.Mode == connectivity.ServingModeServing {
|
|
serving.Fire()
|
|
}
|
|
})
|
|
|
|
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
|
|
}
|
|
defer server.Stop()
|
|
testgrpc.RegisterTestServiceServer(server, &testService{})
|
|
go func() {
|
|
if err := server.Serve(lis); err != nil {
|
|
t.Errorf("Serve() failed: %v", err)
|
|
}
|
|
}()
|
|
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("failed to dial local test server: %v", err)
|
|
}
|
|
defer cc.Close()
|
|
|
|
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
|
|
routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
|
|
resources = e2e.UpdateOptions{
|
|
NodeID: nodeID,
|
|
Listeners: []*v3listenerpb.Listener{listener},
|
|
Routes: []*v3routepb.RouteConfiguration{routeConfig},
|
|
}
|
|
defer cancel()
|
|
if err := managementServer.Update(ctx, resources); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatal("timeout waiting for the xDS Server to go Serving")
|
|
case <-serving.Done():
|
|
}
|
|
|
|
// A unary RPC should work once it transitions into serving. (need this same
|
|
// assertion from LDS resource not found triggering it).
|
|
waitForSuccessfulRPC(ctx, t, cc)
|
|
|
|
// Start a stream before switching the server to not serving. Due to the
|
|
// stream being created before the graceful stop of the underlying
|
|
// connection, it should be able to continue even after the server switches
|
|
// to not serving.
|
|
c := testgrpc.NewTestServiceClient(cc)
|
|
stream, err := c.FullDuplexCall(ctx)
|
|
if err != nil {
|
|
t.Fatalf("cc.FullDuplexCall failed: %f", err)
|
|
}
|
|
|
|
// Lookup the xDS client in use based on the dedicated well-known key, as
|
|
// defined in A71, used by the xDS enabled gRPC server.
|
|
xdsC, close, err := xdsclient.GetForTesting(xdsclient.NameForServer)
|
|
if err != nil {
|
|
t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents))
|
|
}
|
|
defer close()
|
|
|
|
// Invoke LDS Resource not found here (tests graceful close).
|
|
triggerResourceNotFound := internal.TriggerXDSResourceNotFoundForTesting.(func(xdsclient.XDSClient, xdsresource.Type, string) error)
|
|
listenerResourceType := xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL].(xdsresource.Type)
|
|
if err := triggerResourceNotFound(xdsC, listenerResourceType, listener.GetName()); err != nil {
|
|
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
|
|
}
|
|
|
|
// New RPCs on that connection should eventually start failing. Due to
|
|
// Graceful Stop any started streams continue to work.
|
|
if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
|
|
t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
|
|
}
|
|
if err = stream.CloseSend(); err != nil {
|
|
t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
|
|
}
|
|
if _, err = stream.Recv(); err != io.EOF {
|
|
t.Fatalf("unexpected error: %v, expected an EOF error", err)
|
|
}
|
|
|
|
// New RPCs on that connection should eventually start failing.
|
|
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
|
|
}
|
|
|
|
// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
|
|
// returns resource not found. Before getting the resource not found, the xDS
|
|
// Server has not received all configuration needed, so it should Accept and
|
|
// Close any new connections. After it has received the resource not found
|
|
// error, the server should move to serving, successfully Accept Connections,
|
|
// and fail at the L7 level with resource not found specified.
|
|
func (s) TestResourceNotFoundRDS(t *testing.T) {
|
|
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
|
|
|
|
// Create bootstrap configuration pointing to the above management server.
|
|
nodeID := uuid.New().String()
|
|
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
|
|
|
|
lis, err := testutils.LocalTCPListener()
|
|
if err != nil {
|
|
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
|
|
}
|
|
// Setup the management server to respond with a listener resource that
|
|
// specifies a route name to watch, and no RDS resource corresponding to
|
|
// this route name.
|
|
host, port, err := hostPortFromListener(lis)
|
|
if err != nil {
|
|
t.Fatalf("failed to retrieve host and port of server: %v", err)
|
|
}
|
|
|
|
const routeConfigResourceName = "routeName"
|
|
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, routeConfigResourceName)
|
|
resources := e2e.UpdateOptions{
|
|
NodeID: nodeID,
|
|
Listeners: []*v3listenerpb.Listener{listener},
|
|
SkipValidation: true,
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
if err := managementServer.Update(ctx, resources); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
serving := grpcsync.NewEvent()
|
|
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
|
|
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
|
|
if args.Mode == connectivity.ServingModeServing {
|
|
serving.Fire()
|
|
}
|
|
})
|
|
|
|
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
|
|
}
|
|
defer server.Stop()
|
|
testgrpc.RegisterTestServiceServer(server, &testService{})
|
|
go func() {
|
|
if err := server.Serve(lis); err != nil {
|
|
t.Errorf("Serve() failed: %v", err)
|
|
}
|
|
}()
|
|
|
|
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("failed to dial local test server: %v", err)
|
|
}
|
|
defer cc.Close()
|
|
|
|
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
|
|
|
|
// Lookup the xDS client in use based on the dedicated well-known key, as
|
|
// defined in A71, used by the xDS enabled gRPC server.
|
|
xdsC, close, err := xdsclient.GetForTesting(xdsclient.NameForServer)
|
|
if err != nil {
|
|
t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents))
|
|
}
|
|
defer close()
|
|
|
|
// Invoke resource not found - this should result in L7 RPC error with
|
|
// unavailable receive on serving as a result, should trigger it to go
|
|
// serving. Poll as watch might not be started yet to trigger resource not
|
|
// found.
|
|
triggerResourceNotFound := internal.TriggerXDSResourceNotFoundForTesting.(func(xdsclient.XDSClient, xdsresource.Type, string) error)
|
|
routeConfigResourceType := xdsinternal.ResourceTypeMapForTesting[version.V3RouteConfigURL].(xdsresource.Type)
|
|
loop:
|
|
for {
|
|
if err := triggerResourceNotFound(xdsC, routeConfigResourceType, routeConfigResourceName); err != nil {
|
|
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
|
|
}
|
|
select {
|
|
case <-serving.Done():
|
|
break loop
|
|
case <-ctx.Done():
|
|
t.Fatalf("timed out waiting for serving mode to go serving")
|
|
case <-time.After(time.Millisecond):
|
|
}
|
|
}
|
|
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
|
|
}
|
|
|
|
func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
|
|
t.Helper()
|
|
|
|
c := testgrpc.NewTestServiceClient(cc)
|
|
if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
|
t.Fatalf("rpc EmptyCall() failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// waitForFailedRPCWithStatus makes unary RPC's until it receives the expected
|
|
// status in a polling manner. Fails if the RPC made does not return the
|
|
// expected status before the context expires.
|
|
func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.ClientConn, st *status.Status) {
|
|
t.Helper()
|
|
|
|
c := testgrpc.NewTestServiceClient(cc)
|
|
ticker := time.NewTicker(10 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
var err error
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatalf("failure when waiting for RPCs to fail with certain status %v: %v. most recent error received from RPC: %v", st, ctx.Err(), err)
|
|
case <-ticker.C:
|
|
_, err = c.EmptyCall(ctx, &testpb.Empty{})
|
|
if status.Code(err) == st.Code() && strings.Contains(err.Error(), st.Message()) {
|
|
t.Logf("most recent error happy case: %v", err.Error())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|