grpc-go/xds/server_ext_test.go

528 lines
19 KiB
Go

/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds_test
import (
"context"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds"
"google.golang.org/grpc/xds/internal/xdsclient"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)
func hostPortFromListener(lis net.Listener) (string, uint32, error) {
host, p, err := net.SplitHostPort(lis.Addr().String())
if err != nil {
return "", 0, fmt.Errorf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err)
}
port, err := strconv.ParseInt(p, 10, 32)
if err != nil {
return "", 0, fmt.Errorf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err)
}
return host, uint32(port), nil
}
// servingModeChangeHandler handles changes to the serving mode of an
// xDS-enabled gRPC server. It logs the changes and sends the new mode and any
// errors on appropriate channels for the test to consume.
type servingModeChangeHandler struct {
logger interface {
Logf(format string, args ...any)
}
// Access to the below fields are guarded by this mutex.
mu sync.Mutex
modeCh chan connectivity.ServingMode
errCh chan error
currentMode connectivity.ServingMode
currentErr error
}
func newServingModeChangeHandler(t *testing.T) *servingModeChangeHandler {
return &servingModeChangeHandler{
logger: t,
modeCh: make(chan connectivity.ServingMode, 1),
errCh: make(chan error, 1),
}
}
func (m *servingModeChangeHandler) modeChangeCallback(addr net.Addr, args xds.ServingModeChangeArgs) {
m.mu.Lock()
defer m.mu.Unlock()
// Suppress pushing duplicate mode change and error if the mode is staying
// in NOT_SERVING and the error is the same.
//
// TODO(purnesh42h): Should we move this check to listener wrapper? This
// shouldn't happen in practice a lot. But we never know what kind of
// management servers users run.
if m.currentMode == args.Mode && m.currentMode == connectivity.ServingModeNotServing && m.currentErr.Error() == args.Err.Error() {
return
}
m.logger.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
m.modeCh <- args.Mode
m.currentMode = args.Mode
if args.Err != nil {
m.errCh <- args.Err
}
m.currentErr = args.Err
}
// createStubServer creates a new xDS-enabled gRPC server and returns a
// stubserver.StubServer that can be used for testing. The server is configured
// with the provided modeChangeOpt and xdsclient.Pool.
func createStubServer(t *testing.T, lis net.Listener, opts ...grpc.ServerOption) *stubserver.StubServer {
stub := &stubserver.StubServer{
Listener: lis,
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
if _, err := stream.Recv(); err == io.EOF {
return nil
} else if err != nil {
return err
}
}
},
}
server, err := xds.NewGRPCServer(opts...)
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
stub.S = server
stubserver.StartTestService(t, stub)
t.Cleanup(stub.Stop)
return stub
}
// waitForSuccessfulRPC waits for an RPC to succeed, repeatedly calling the
// EmptyCall RPC on the provided client connection until the call succeeds.
// If the context is canceled or the expected error is not before the context
// timeout expires, the test will fail.
func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn, opts ...grpc.CallOption) {
t.Helper()
client := testgrpc.NewTestServiceClient(cc)
for {
select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for RPCs to succeed")
case <-time.After(defaultTestShortTimeout):
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, opts...); err == nil {
return
}
}
}
}
// waitForFailedRPCWithStatus waits for an RPC to fail with the expected status
// code, error message, and node ID. It repeatedly calls the EmptyCall RPC on
// the provided client connection until the error matches the expected values.
// If the context is canceled or the expected error is not before the context
// timeout expires, the test will fail.
func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.ClientConn, wantCode codes.Code, wantErr, wantNodeID string) {
t.Helper()
client := testgrpc.NewTestServiceClient(cc)
var err error
for {
select {
case <-ctx.Done():
t.Fatalf("RPCs failed with most recent error: %v. Want status code %v, error: %s, node id: %s", err, wantCode, wantErr, wantNodeID)
case <-time.After(defaultTestShortTimeout):
_, err = client.EmptyCall(ctx, &testpb.Empty{})
if gotCode := status.Code(err); gotCode != wantCode {
continue
}
if gotErr := err.Error(); !strings.Contains(gotErr, wantErr) {
continue
}
if !strings.Contains(err.Error(), wantNodeID) {
continue
}
t.Logf("Most recent error happy case: %v", err.Error())
return
}
}
}
// Tests the basic scenario for an xDS enabled gRPC server.
//
// - Verifies that the xDS enabled gRPC server requests for the expected
// listener resource.
// - Once the listener resource is received from the management server, it
// verifies that the xDS enabled gRPC server requests for the appropriate
// route configuration name. Also verifies that at this point, the server has
// not yet started serving RPCs
// - Once the route configuration is received from the management server, it
// verifies that the server can serve RPCs successfully.
func (s) TestServer_Basic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start an xDS management server.
listenerNamesCh := make(chan []string, 1)
routeNamesCh := make(chan []string, 1)
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
switch req.GetTypeUrl() {
case "type.googleapis.com/envoy.config.listener.v3.Listener":
select {
case listenerNamesCh <- req.GetResourceNames():
case <-ctx.Done():
}
case "type.googleapis.com/envoy.config.route.v3.RouteConfiguration":
select {
case routeNamesCh <- req.GetResourceNames():
case <-ctx.Done():
}
}
return nil
},
AllowResourceSubset: true,
})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
// Create a listener on a local port to act as the xDS enabled gRPC server.
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to listen to local port: %v", err)
}
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("Failed to retrieve host and port of server: %v", err)
}
// Configure the managegement server with a listener resource for the above
// xDS enabled gRPC server.
const routeConfigName = "routeName"
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")},
SkipValidation: true,
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Start an xDS-enabled gRPC server with the above bootstrap configuration.
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
pool := xdsclient.NewPool(config)
modeChangeHandler := newServingModeChangeHandler(t)
modeChangeOpt := xds.ServingModeCallback(modeChangeHandler.modeChangeCallback)
createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool))
// Wait for the expected listener resource to be requested.
wantLisResourceNames := []string{fmt.Sprintf(e2e.ServerListenerResourceNameTemplate, net.JoinHostPort(host, strconv.Itoa(int(port))))}
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for the expected listener resource to be requested")
case gotLisResourceName := <-listenerNamesCh:
if !cmp.Equal(gotLisResourceName, wantLisResourceNames) {
t.Fatalf("Got unexpected listener resource names: %v, want %v", gotLisResourceName, wantLisResourceNames)
}
}
// Wait for the expected route config resource to be requested.
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for the expected route config resource to be requested")
case gotRouteNames := <-routeNamesCh:
if !cmp.Equal(gotRouteNames, []string{routeConfigName}) {
t.Fatalf("Got unexpected route config resource names: %v, want %v", gotRouteNames, []string{routeConfigName})
}
}
// Ensure that the server is not serving RPCs yet.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-modeChangeHandler.modeCh:
t.Fatal("Server started serving RPCs before the route config was received")
}
// Create a gRPC channel to the xDS enabled server.
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed: %v", lis.Addr(), err)
}
defer cc.Close()
// Ensure that the server isnt't serving RPCs successfully.
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.Unavailable {
t.Fatalf("EmptyCall() returned %v, want %v", err, codes.Unavailable)
}
// Configure the management server with the expected route config resource,
// and expext RPCs to succeed.
resources.Routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigNonForwardingAction(routeConfigName)}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for the server to start serving RPCs")
case gotMode := <-modeChangeHandler.modeCh:
if gotMode != connectivity.ServingModeServing {
t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing)
}
}
waitForSuccessfulRPC(ctx, t, cc)
}
// Tests that the xDS-enabled gRPC server cleans up all its resources when all
// connections to it are closed.
func (s) TestServer_ConnectionCleanup(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start an xDS management server.
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
// Create a listener on a local port to act as the xDS enabled gRPC server.
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to listen to local port: %v", err)
}
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("Failed to retrieve host and port of server: %v", err)
}
// Configure the managegement server with a listener and route configuration
// resource for the above xDS enabled gRPC server.
const routeConfigName = "routeName"
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigNonForwardingAction(routeConfigName)},
SkipValidation: true,
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Start an xDS-enabled gRPC server with the above bootstrap configuration.
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
pool := xdsclient.NewPool(config)
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
})
createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool))
// Create a gRPC channel and verify that RPCs succeed.
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed: %v", lis.Addr(), err)
}
defer cc.Close()
waitForSuccessfulRPC(ctx, t, cc)
// Create multiple channels to the server, and make an RPC on each one. When
// everything is closed, the server should have cleaned up all its resources
// as well (and this will be verified by the leakchecker).
const numConns = 100
var wg sync.WaitGroup
wg.Add(numConns)
for range numConns {
go func() {
defer wg.Done()
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Errorf("grpc.NewClient failed with err: %v", err)
}
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Errorf("EmptyCall() failed: %v", err)
}
cc.Close()
}()
}
wg.Wait()
}
// Tests that multiple xDS-enabled gRPC servers can be created with different
// bootstrap configurations, and that they correctly request different LDS
// resources from the management server based on their respective listening
// ports. It also ensures that gRPC clients can connect to the intended server
// and that RPCs function correctly. The test uses the grpc.Peer() call option
// to validate that the client is connected to the correct server.
func (s) TestServer_MultipleServers_DifferentBootstrapConfigurations(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
// Create two bootstrap configurations pointing to the above management server.
nodeID1 := uuid.New().String()
bootstrapContents1 := e2e.DefaultBootstrapContents(t, nodeID1, mgmtServer.Address)
nodeID2 := uuid.New().String()
bootstrapContents2 := e2e.DefaultBootstrapContents(t, nodeID2, mgmtServer.Address)
// Create two xDS-enabled gRPC servers using the above bootstrap configs.
lis1, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
lis2, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
modeChangeHandler1 := newServingModeChangeHandler(t)
modeChangeOpt1 := xds.ServingModeCallback(modeChangeHandler1.modeChangeCallback)
modeChangeHandler2 := newServingModeChangeHandler(t)
modeChangeOpt2 := xds.ServingModeCallback(modeChangeHandler2.modeChangeCallback)
config1, err := bootstrap.NewConfigFromContents(bootstrapContents1)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents1), err)
}
pool1 := xdsclient.NewPool(config1)
config2, err := bootstrap.NewConfigFromContents(bootstrapContents2)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents2), err)
}
pool2 := xdsclient.NewPool(config2)
createStubServer(t, lis1, modeChangeOpt1, xds.ClientPoolForTesting(pool1))
createStubServer(t, lis2, modeChangeOpt2, xds.ClientPoolForTesting(pool2))
// Update the management server with the listener resources pointing to the
// corresponding gRPC servers.
host1, port1, err := hostPortFromListener(lis1)
if err != nil {
t.Fatalf("Failed to retrieve host and port of server: %v", err)
}
host2, port2, err := hostPortFromListener(lis2)
if err != nil {
t.Fatalf("Failed to retrieve host and port of server: %v", err)
}
resources1 := e2e.UpdateOptions{
NodeID: nodeID1,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host1, port1, e2e.SecurityLevelNone, "routeName")},
}
if err := mgmtServer.Update(ctx, resources1); err != nil {
t.Fatal(err)
}
resources2 := e2e.UpdateOptions{
NodeID: nodeID2,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host2, port2, e2e.SecurityLevelNone, "routeName")},
}
if err := mgmtServer.Update(ctx, resources2); err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for the xDS-enabled gRPC server to go SERVING")
case gotMode := <-modeChangeHandler1.modeCh:
if gotMode != connectivity.ServingModeServing {
t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for the xDS-enabled gRPC server to go SERVING")
case gotMode := <-modeChangeHandler2.modeCh:
if gotMode != connectivity.ServingModeServing {
t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing)
}
}
// Create two gRPC clients, one for each server.
cc1, err := grpc.NewClient(lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to create client for test server 1: %s, %v", lis1.Addr().String(), err)
}
defer cc1.Close()
cc2, err := grpc.NewClient(lis2.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to create client for test server 2: %s, %v", lis2.Addr().String(), err)
}
defer cc2.Close()
// Both unary RPCs should work once the servers transitions into serving.
var peer1 peer.Peer
waitForSuccessfulRPC(ctx, t, cc1, grpc.Peer(&peer1))
if peer1.Addr.String() != lis1.Addr().String() {
t.Errorf("Connected to wrong peer: %s, want %s", peer1.Addr, lis1.Addr())
}
var peer2 peer.Peer
waitForSuccessfulRPC(ctx, t, cc2, grpc.Peer(&peer2))
if peer2.Addr.String() != lis2.Addr().String() {
t.Errorf("Connected to wrong peer: %s, want %s", peer2.Addr, lis2.Addr())
}
}