mirror of https://github.com/grpc/grpc-go.git
528 lines
19 KiB
Go
528 lines
19 KiB
Go
/*
|
|
*
|
|
* Copyright 2024 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package xds_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/google/uuid"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/internal/grpctest"
|
|
"google.golang.org/grpc/internal/stubserver"
|
|
"google.golang.org/grpc/internal/testutils"
|
|
"google.golang.org/grpc/internal/testutils/xds/e2e"
|
|
"google.golang.org/grpc/internal/xds/bootstrap"
|
|
"google.golang.org/grpc/peer"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/grpc/xds"
|
|
"google.golang.org/grpc/xds/internal/xdsclient"
|
|
|
|
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
|
|
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
|
|
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
|
testgrpc "google.golang.org/grpc/interop/grpc_testing"
|
|
testpb "google.golang.org/grpc/interop/grpc_testing"
|
|
)
|
|
|
|
type s struct {
|
|
grpctest.Tester
|
|
}
|
|
|
|
func Test(t *testing.T) {
|
|
grpctest.RunSubTests(t, s{})
|
|
}
|
|
|
|
const (
|
|
defaultTestTimeout = 10 * time.Second
|
|
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
|
|
)
|
|
|
|
func hostPortFromListener(lis net.Listener) (string, uint32, error) {
|
|
host, p, err := net.SplitHostPort(lis.Addr().String())
|
|
if err != nil {
|
|
return "", 0, fmt.Errorf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err)
|
|
}
|
|
port, err := strconv.ParseInt(p, 10, 32)
|
|
if err != nil {
|
|
return "", 0, fmt.Errorf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err)
|
|
}
|
|
return host, uint32(port), nil
|
|
}
|
|
|
|
// servingModeChangeHandler handles changes to the serving mode of an
|
|
// xDS-enabled gRPC server. It logs the changes and sends the new mode and any
|
|
// errors on appropriate channels for the test to consume.
|
|
type servingModeChangeHandler struct {
|
|
logger interface {
|
|
Logf(format string, args ...any)
|
|
}
|
|
// Access to the below fields are guarded by this mutex.
|
|
mu sync.Mutex
|
|
modeCh chan connectivity.ServingMode
|
|
errCh chan error
|
|
currentMode connectivity.ServingMode
|
|
currentErr error
|
|
}
|
|
|
|
func newServingModeChangeHandler(t *testing.T) *servingModeChangeHandler {
|
|
return &servingModeChangeHandler{
|
|
logger: t,
|
|
modeCh: make(chan connectivity.ServingMode, 1),
|
|
errCh: make(chan error, 1),
|
|
}
|
|
}
|
|
|
|
func (m *servingModeChangeHandler) modeChangeCallback(addr net.Addr, args xds.ServingModeChangeArgs) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
// Suppress pushing duplicate mode change and error if the mode is staying
|
|
// in NOT_SERVING and the error is the same.
|
|
//
|
|
// TODO(purnesh42h): Should we move this check to listener wrapper? This
|
|
// shouldn't happen in practice a lot. But we never know what kind of
|
|
// management servers users run.
|
|
if m.currentMode == args.Mode && m.currentMode == connectivity.ServingModeNotServing && m.currentErr.Error() == args.Err.Error() {
|
|
return
|
|
}
|
|
m.logger.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
|
|
m.modeCh <- args.Mode
|
|
m.currentMode = args.Mode
|
|
if args.Err != nil {
|
|
m.errCh <- args.Err
|
|
}
|
|
m.currentErr = args.Err
|
|
}
|
|
|
|
// createStubServer creates a new xDS-enabled gRPC server and returns a
|
|
// stubserver.StubServer that can be used for testing. The server is configured
|
|
// with the provided modeChangeOpt and xdsclient.Pool.
|
|
func createStubServer(t *testing.T, lis net.Listener, opts ...grpc.ServerOption) *stubserver.StubServer {
|
|
stub := &stubserver.StubServer{
|
|
Listener: lis,
|
|
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
|
return &testpb.Empty{}, nil
|
|
},
|
|
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
|
|
for {
|
|
if _, err := stream.Recv(); err == io.EOF {
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
},
|
|
}
|
|
server, err := xds.NewGRPCServer(opts...)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
|
|
}
|
|
stub.S = server
|
|
stubserver.StartTestService(t, stub)
|
|
t.Cleanup(stub.Stop)
|
|
return stub
|
|
}
|
|
|
|
// waitForSuccessfulRPC waits for an RPC to succeed, repeatedly calling the
|
|
// EmptyCall RPC on the provided client connection until the call succeeds.
|
|
// If the context is canceled or the expected error is not before the context
|
|
// timeout expires, the test will fail.
|
|
func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn, opts ...grpc.CallOption) {
|
|
t.Helper()
|
|
|
|
client := testgrpc.NewTestServiceClient(cc)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatalf("Timeout waiting for RPCs to succeed")
|
|
case <-time.After(defaultTestShortTimeout):
|
|
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, opts...); err == nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// waitForFailedRPCWithStatus waits for an RPC to fail with the expected status
|
|
// code, error message, and node ID. It repeatedly calls the EmptyCall RPC on
|
|
// the provided client connection until the error matches the expected values.
|
|
// If the context is canceled or the expected error is not before the context
|
|
// timeout expires, the test will fail.
|
|
func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.ClientConn, wantCode codes.Code, wantErr, wantNodeID string) {
|
|
t.Helper()
|
|
|
|
client := testgrpc.NewTestServiceClient(cc)
|
|
var err error
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatalf("RPCs failed with most recent error: %v. Want status code %v, error: %s, node id: %s", err, wantCode, wantErr, wantNodeID)
|
|
case <-time.After(defaultTestShortTimeout):
|
|
_, err = client.EmptyCall(ctx, &testpb.Empty{})
|
|
if gotCode := status.Code(err); gotCode != wantCode {
|
|
continue
|
|
}
|
|
if gotErr := err.Error(); !strings.Contains(gotErr, wantErr) {
|
|
continue
|
|
}
|
|
if !strings.Contains(err.Error(), wantNodeID) {
|
|
continue
|
|
}
|
|
t.Logf("Most recent error happy case: %v", err.Error())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Tests the basic scenario for an xDS enabled gRPC server.
|
|
//
|
|
// - Verifies that the xDS enabled gRPC server requests for the expected
|
|
// listener resource.
|
|
// - Once the listener resource is received from the management server, it
|
|
// verifies that the xDS enabled gRPC server requests for the appropriate
|
|
// route configuration name. Also verifies that at this point, the server has
|
|
// not yet started serving RPCs
|
|
// - Once the route configuration is received from the management server, it
|
|
// verifies that the server can serve RPCs successfully.
|
|
func (s) TestServer_Basic(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
|
|
// Start an xDS management server.
|
|
listenerNamesCh := make(chan []string, 1)
|
|
routeNamesCh := make(chan []string, 1)
|
|
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
|
|
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
|
|
switch req.GetTypeUrl() {
|
|
case "type.googleapis.com/envoy.config.listener.v3.Listener":
|
|
select {
|
|
case listenerNamesCh <- req.GetResourceNames():
|
|
case <-ctx.Done():
|
|
}
|
|
case "type.googleapis.com/envoy.config.route.v3.RouteConfiguration":
|
|
select {
|
|
case routeNamesCh <- req.GetResourceNames():
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
return nil
|
|
},
|
|
AllowResourceSubset: true,
|
|
})
|
|
|
|
// Create bootstrap configuration pointing to the above management server.
|
|
nodeID := uuid.New().String()
|
|
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
|
|
|
|
// Create a listener on a local port to act as the xDS enabled gRPC server.
|
|
lis, err := testutils.LocalTCPListener()
|
|
if err != nil {
|
|
t.Fatalf("Failed to listen to local port: %v", err)
|
|
}
|
|
host, port, err := hostPortFromListener(lis)
|
|
if err != nil {
|
|
t.Fatalf("Failed to retrieve host and port of server: %v", err)
|
|
}
|
|
|
|
// Configure the managegement server with a listener resource for the above
|
|
// xDS enabled gRPC server.
|
|
const routeConfigName = "routeName"
|
|
resources := e2e.UpdateOptions{
|
|
NodeID: nodeID,
|
|
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")},
|
|
SkipValidation: true,
|
|
}
|
|
if err := managementServer.Update(ctx, resources); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Start an xDS-enabled gRPC server with the above bootstrap configuration.
|
|
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
|
|
if err != nil {
|
|
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
|
|
}
|
|
pool := xdsclient.NewPool(config)
|
|
modeChangeHandler := newServingModeChangeHandler(t)
|
|
modeChangeOpt := xds.ServingModeCallback(modeChangeHandler.modeChangeCallback)
|
|
createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool))
|
|
|
|
// Wait for the expected listener resource to be requested.
|
|
wantLisResourceNames := []string{fmt.Sprintf(e2e.ServerListenerResourceNameTemplate, net.JoinHostPort(host, strconv.Itoa(int(port))))}
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatal("Timeout waiting for the expected listener resource to be requested")
|
|
case gotLisResourceName := <-listenerNamesCh:
|
|
if !cmp.Equal(gotLisResourceName, wantLisResourceNames) {
|
|
t.Fatalf("Got unexpected listener resource names: %v, want %v", gotLisResourceName, wantLisResourceNames)
|
|
}
|
|
}
|
|
|
|
// Wait for the expected route config resource to be requested.
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatal("Timeout waiting for the expected route config resource to be requested")
|
|
case gotRouteNames := <-routeNamesCh:
|
|
if !cmp.Equal(gotRouteNames, []string{routeConfigName}) {
|
|
t.Fatalf("Got unexpected route config resource names: %v, want %v", gotRouteNames, []string{routeConfigName})
|
|
}
|
|
}
|
|
|
|
// Ensure that the server is not serving RPCs yet.
|
|
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
|
|
defer sCancel()
|
|
select {
|
|
case <-sCtx.Done():
|
|
case <-modeChangeHandler.modeCh:
|
|
t.Fatal("Server started serving RPCs before the route config was received")
|
|
}
|
|
|
|
// Create a gRPC channel to the xDS enabled server.
|
|
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("grpc.NewClient(%q) failed: %v", lis.Addr(), err)
|
|
}
|
|
defer cc.Close()
|
|
|
|
// Ensure that the server isnt't serving RPCs successfully.
|
|
client := testgrpc.NewTestServiceClient(cc)
|
|
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.Unavailable {
|
|
t.Fatalf("EmptyCall() returned %v, want %v", err, codes.Unavailable)
|
|
}
|
|
|
|
// Configure the management server with the expected route config resource,
|
|
// and expext RPCs to succeed.
|
|
resources.Routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigNonForwardingAction(routeConfigName)}
|
|
if err := managementServer.Update(ctx, resources); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatal("Timeout waiting for the server to start serving RPCs")
|
|
case gotMode := <-modeChangeHandler.modeCh:
|
|
if gotMode != connectivity.ServingModeServing {
|
|
t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing)
|
|
}
|
|
}
|
|
waitForSuccessfulRPC(ctx, t, cc)
|
|
}
|
|
|
|
// Tests that the xDS-enabled gRPC server cleans up all its resources when all
|
|
// connections to it are closed.
|
|
func (s) TestServer_ConnectionCleanup(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
|
|
// Start an xDS management server.
|
|
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
|
|
|
|
// Create bootstrap configuration pointing to the above management server.
|
|
nodeID := uuid.New().String()
|
|
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
|
|
|
|
// Create a listener on a local port to act as the xDS enabled gRPC server.
|
|
lis, err := testutils.LocalTCPListener()
|
|
if err != nil {
|
|
t.Fatalf("Failed to listen to local port: %v", err)
|
|
}
|
|
host, port, err := hostPortFromListener(lis)
|
|
if err != nil {
|
|
t.Fatalf("Failed to retrieve host and port of server: %v", err)
|
|
}
|
|
|
|
// Configure the managegement server with a listener and route configuration
|
|
// resource for the above xDS enabled gRPC server.
|
|
const routeConfigName = "routeName"
|
|
resources := e2e.UpdateOptions{
|
|
NodeID: nodeID,
|
|
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")},
|
|
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigNonForwardingAction(routeConfigName)},
|
|
SkipValidation: true,
|
|
}
|
|
if err := managementServer.Update(ctx, resources); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Start an xDS-enabled gRPC server with the above bootstrap configuration.
|
|
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
|
|
if err != nil {
|
|
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
|
|
}
|
|
pool := xdsclient.NewPool(config)
|
|
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
|
|
t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
|
|
})
|
|
createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool))
|
|
|
|
// Create a gRPC channel and verify that RPCs succeed.
|
|
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("grpc.NewClient(%q) failed: %v", lis.Addr(), err)
|
|
}
|
|
defer cc.Close()
|
|
waitForSuccessfulRPC(ctx, t, cc)
|
|
|
|
// Create multiple channels to the server, and make an RPC on each one. When
|
|
// everything is closed, the server should have cleaned up all its resources
|
|
// as well (and this will be verified by the leakchecker).
|
|
const numConns = 100
|
|
var wg sync.WaitGroup
|
|
wg.Add(numConns)
|
|
for range numConns {
|
|
go func() {
|
|
defer wg.Done()
|
|
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Errorf("grpc.NewClient failed with err: %v", err)
|
|
}
|
|
client := testgrpc.NewTestServiceClient(cc)
|
|
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
|
t.Errorf("EmptyCall() failed: %v", err)
|
|
}
|
|
cc.Close()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// Tests that multiple xDS-enabled gRPC servers can be created with different
|
|
// bootstrap configurations, and that they correctly request different LDS
|
|
// resources from the management server based on their respective listening
|
|
// ports. It also ensures that gRPC clients can connect to the intended server
|
|
// and that RPCs function correctly. The test uses the grpc.Peer() call option
|
|
// to validate that the client is connected to the correct server.
|
|
func (s) TestServer_MultipleServers_DifferentBootstrapConfigurations(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
|
|
|
|
// Create two bootstrap configurations pointing to the above management server.
|
|
nodeID1 := uuid.New().String()
|
|
bootstrapContents1 := e2e.DefaultBootstrapContents(t, nodeID1, mgmtServer.Address)
|
|
nodeID2 := uuid.New().String()
|
|
bootstrapContents2 := e2e.DefaultBootstrapContents(t, nodeID2, mgmtServer.Address)
|
|
|
|
// Create two xDS-enabled gRPC servers using the above bootstrap configs.
|
|
lis1, err := testutils.LocalTCPListener()
|
|
if err != nil {
|
|
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
|
|
}
|
|
lis2, err := testutils.LocalTCPListener()
|
|
if err != nil {
|
|
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
|
|
}
|
|
|
|
modeChangeHandler1 := newServingModeChangeHandler(t)
|
|
modeChangeOpt1 := xds.ServingModeCallback(modeChangeHandler1.modeChangeCallback)
|
|
modeChangeHandler2 := newServingModeChangeHandler(t)
|
|
modeChangeOpt2 := xds.ServingModeCallback(modeChangeHandler2.modeChangeCallback)
|
|
config1, err := bootstrap.NewConfigFromContents(bootstrapContents1)
|
|
if err != nil {
|
|
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents1), err)
|
|
}
|
|
pool1 := xdsclient.NewPool(config1)
|
|
config2, err := bootstrap.NewConfigFromContents(bootstrapContents2)
|
|
if err != nil {
|
|
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents2), err)
|
|
}
|
|
pool2 := xdsclient.NewPool(config2)
|
|
createStubServer(t, lis1, modeChangeOpt1, xds.ClientPoolForTesting(pool1))
|
|
createStubServer(t, lis2, modeChangeOpt2, xds.ClientPoolForTesting(pool2))
|
|
|
|
// Update the management server with the listener resources pointing to the
|
|
// corresponding gRPC servers.
|
|
host1, port1, err := hostPortFromListener(lis1)
|
|
if err != nil {
|
|
t.Fatalf("Failed to retrieve host and port of server: %v", err)
|
|
}
|
|
host2, port2, err := hostPortFromListener(lis2)
|
|
if err != nil {
|
|
t.Fatalf("Failed to retrieve host and port of server: %v", err)
|
|
}
|
|
|
|
resources1 := e2e.UpdateOptions{
|
|
NodeID: nodeID1,
|
|
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host1, port1, e2e.SecurityLevelNone, "routeName")},
|
|
}
|
|
if err := mgmtServer.Update(ctx, resources1); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
resources2 := e2e.UpdateOptions{
|
|
NodeID: nodeID2,
|
|
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host2, port2, e2e.SecurityLevelNone, "routeName")},
|
|
}
|
|
if err := mgmtServer.Update(ctx, resources2); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatal("Timeout waiting for the xDS-enabled gRPC server to go SERVING")
|
|
case gotMode := <-modeChangeHandler1.modeCh:
|
|
if gotMode != connectivity.ServingModeServing {
|
|
t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing)
|
|
}
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatal("Timeout waiting for the xDS-enabled gRPC server to go SERVING")
|
|
case gotMode := <-modeChangeHandler2.modeCh:
|
|
if gotMode != connectivity.ServingModeServing {
|
|
t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing)
|
|
}
|
|
}
|
|
|
|
// Create two gRPC clients, one for each server.
|
|
cc1, err := grpc.NewClient(lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create client for test server 1: %s, %v", lis1.Addr().String(), err)
|
|
}
|
|
defer cc1.Close()
|
|
|
|
cc2, err := grpc.NewClient(lis2.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create client for test server 2: %s, %v", lis2.Addr().String(), err)
|
|
}
|
|
defer cc2.Close()
|
|
|
|
// Both unary RPCs should work once the servers transitions into serving.
|
|
var peer1 peer.Peer
|
|
waitForSuccessfulRPC(ctx, t, cc1, grpc.Peer(&peer1))
|
|
if peer1.Addr.String() != lis1.Addr().String() {
|
|
t.Errorf("Connected to wrong peer: %s, want %s", peer1.Addr, lis1.Addr())
|
|
}
|
|
|
|
var peer2 peer.Peer
|
|
waitForSuccessfulRPC(ctx, t, cc2, grpc.Peer(&peer2))
|
|
if peer2.Addr.String() != lis2.Addr().String() {
|
|
t.Errorf("Connected to wrong peer: %s, want %s", peer2.Addr, lis2.Addr())
|
|
}
|
|
}
|