xds: local interop tests (#4823)

This commit is contained in:
Menghan Li 2021-10-07 11:47:53 -07:00 committed by GitHub
parent 404d8fd513
commit b9d7c74e01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 591 additions and 20 deletions

View File

@ -114,3 +114,4 @@ jobs:
examples/examples_test.sh
security/advancedtls/examples/examples_test.sh
interop/interop_test.sh
xds/internal/test/e2e/run.sh

View File

@ -43,15 +43,19 @@ import (
)
var (
port = flag.Int("port", 8080, "Listening port for test service")
maintenancePort = flag.Int("maintenance_port", 8081, "Listening port for maintenance services like health, reflection, channelz etc when -secure_mode is true. When -secure_mode is false, all these services will be registered on -port")
serverID = flag.String("server_id", "go_server", "Server ID included in response")
secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.")
port = flag.Int("port", 8080, "Listening port for test service")
maintenancePort = flag.Int("maintenance_port", 8081, "Listening port for maintenance services like health, reflection, channelz etc when -secure_mode is true. When -secure_mode is false, all these services will be registered on -port")
serverID = flag.String("server_id", "go_server", "Server ID included in response")
secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.")
hostNameOverride = flag.String("host_name_override", "", "If set, use this as the hostname instead of the real hostname")
logger = grpclog.Component("interop")
)
func getHostname() string {
if *hostNameOverride != "" {
return *hostNameOverride
}
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("failed to get hostname: %v", err)
@ -64,6 +68,7 @@ func getHostname() string {
type testServiceImpl struct {
testgrpc.UnimplementedTestServiceServer
hostname string
serverID string
}
func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
@ -73,7 +78,7 @@ func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*test
func (s *testServiceImpl) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
return &testpb.SimpleResponse{ServerId: *serverID, Hostname: s.hostname}, nil
return &testpb.SimpleResponse{ServerId: s.serverID, Hostname: s.hostname}, nil
}
// xdsUpdateHealthServiceImpl provides an implementation of the
@ -108,7 +113,7 @@ func main() {
logger.Fatal("-port and -maintenance_port must be different when -secure_mode is set")
}
testService := &testServiceImpl{hostname: getHostname()}
testService := &testServiceImpl{hostname: getHostname(), serverID: *serverID}
healthServer := health.NewServer()
updateHealthService := &xdsUpdateHealthServiceImpl{healthServer: healthServer}

View File

@ -153,7 +153,7 @@ func init() {
clusterAnys[i] = testutils.MarshalAny(clusters[i])
}
for i := range edsTargets {
endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i])
endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i:i+1])
endpointAnys[i] = testutils.MarshalAny(endpoints[i])
}
}

View File

@ -0,0 +1,19 @@
Build client and server binaries.
```sh
go build -o ./binaries/client ../../../../interop/xds/client/
go build -o ./binaries/server ../../../../interop/xds/server/
```
Run the test
```sh
go test . -v
```
The client/server paths are flags
```sh
go test . -v -client=$HOME/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-client
```
Note that grpc logs are only turned on for Go.

View File

@ -0,0 +1,62 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package e2e
import (
"fmt"
"github.com/google/uuid"
xdsinternal "google.golang.org/grpc/internal/xds"
"google.golang.org/grpc/xds/internal/testutils/e2e"
)
type controlPlane struct {
server *e2e.ManagementServer
nodeID string
bootstrapContent string
}
func newControlPlane(testName string) (*controlPlane, error) {
// Spin up an xDS management server on a local port.
server, err := e2e.StartManagementServer()
if err != nil {
return nil, fmt.Errorf("failed to spin up the xDS management server: %v", err)
}
nodeID := uuid.New().String()
bootstrapContentBytes, err := xdsinternal.BootstrapContents(xdsinternal.BootstrapOptions{
Version: xdsinternal.TransportV3,
NodeID: nodeID,
ServerURI: server.Address,
ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
})
if err != nil {
server.Stop()
return nil, fmt.Errorf("failed to create bootstrap file: %v", err)
}
return &controlPlane{
server: server,
nodeID: nodeID,
bootstrapContent: string(bootstrapContentBytes),
}, nil
}
func (cp *controlPlane) stop() {
cp.server.Stop()
}

View File

@ -0,0 +1,176 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package e2e implements xds e2e tests using go-control-plane.
package e2e
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"google.golang.org/grpc"
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
func cmd(path string, logger io.Writer, args []string, env []string) (*exec.Cmd, error) {
cmd := exec.Command(path, args...)
cmd.Env = append(os.Environ(), env...)
cmd.Stdout = logger
cmd.Stderr = logger
return cmd, nil
}
const (
clientStatsPort = 60363 // TODO: make this different per-test, only needed for parallel tests.
)
type client struct {
cmd *exec.Cmd
target string
statsCC *grpc.ClientConn
}
// newClient create a client with the given target and bootstrap content.
func newClient(target, binaryPath, bootstrap string, logger io.Writer, flags ...string) (*client, error) {
cmd, err := cmd(
binaryPath,
logger,
append([]string{
"--server=" + target,
"--print_response=true",
"--qps=100",
fmt.Sprintf("--stats_port=%d", clientStatsPort),
}, flags...), // Append any flags from caller.
[]string{
"GRPC_GO_LOG_VERBOSITY_LEVEL=99",
"GRPC_GO_LOG_SEVERITY_LEVEL=info",
"GRPC_XDS_BOOTSTRAP_CONFIG=" + bootstrap, // The bootstrap content doesn't need to be quoted.
},
)
if err != nil {
return nil, fmt.Errorf("failed to run client cmd: %v", err)
}
cmd.Start()
cc, err := grpc.Dial(fmt.Sprintf("localhost:%d", clientStatsPort), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.WaitForReady(true)))
if err != nil {
return nil, err
}
return &client{
cmd: cmd,
target: target,
statsCC: cc,
}, nil
}
func (c *client) clientStats(ctx context.Context) (*testpb.LoadBalancerStatsResponse, error) {
ccc := testpb.NewLoadBalancerStatsServiceClient(c.statsCC)
return ccc.GetClientStats(ctx, &testpb.LoadBalancerStatsRequest{
NumRpcs: 100,
TimeoutSec: 10,
})
}
func (c *client) configRPCs(ctx context.Context, req *testpb.ClientConfigureRequest) error {
ccc := testpb.NewXdsUpdateClientConfigureServiceClient(c.statsCC)
_, err := ccc.Configure(ctx, req)
return err
}
func (c *client) channelzSubChannels(ctx context.Context) ([]*channelzpb.Subchannel, error) {
ccc := channelzpb.NewChannelzClient(c.statsCC)
r, err := ccc.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{})
if err != nil {
return nil, err
}
var ret []*channelzpb.Subchannel
for _, cc := range r.Channel {
if cc.Data.Target != c.target {
continue
}
for _, sc := range cc.SubchannelRef {
rr, err := ccc.GetSubchannel(ctx, &channelzpb.GetSubchannelRequest{SubchannelId: sc.SubchannelId})
if err != nil {
return nil, err
}
ret = append(ret, rr.Subchannel)
}
}
return ret, nil
}
func (c *client) stop() {
c.cmd.Process.Kill()
c.cmd.Wait()
}
const (
serverPort = 50051 // TODO: make this different per-test, only needed for parallel tests.
)
type server struct {
cmd *exec.Cmd
port int
}
// newServer creates multiple servers with the given bootstrap content.
//
// Each server gets a different hostname, in the format of
// <hostnamePrefix>-<index>.
func newServers(hostnamePrefix, binaryPath, bootstrap string, logger io.Writer, count int) (_ []*server, err error) {
var ret []*server
defer func() {
if err != nil {
for _, s := range ret {
s.stop()
}
}
}()
for i := 0; i < count; i++ {
port := serverPort + i
cmd, err := cmd(
binaryPath,
logger,
[]string{
fmt.Sprintf("--port=%d", port),
fmt.Sprintf("--host_name_override=%s-%d", hostnamePrefix, i),
},
[]string{
"GRPC_GO_LOG_VERBOSITY_LEVEL=99",
"GRPC_GO_LOG_SEVERITY_LEVEL=info",
"GRPC_XDS_BOOTSTRAP_CONFIG=" + bootstrap, // The bootstrap content doesn't need to be quoted.,
},
)
if err != nil {
return nil, fmt.Errorf("failed to run server cmd: %v", err)
}
cmd.Start()
ret = append(ret, &server{cmd: cmd, port: port})
}
return ret, nil
}
func (s *server) stop() {
s.cmd.Process.Kill()
s.cmd.Wait()
}

View File

@ -0,0 +1,262 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package e2e
import (
"bytes"
"context"
"flag"
"fmt"
"os"
"strconv"
"testing"
"time"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/xds/internal/testutils/e2e"
)
var (
clientPath = flag.String("client", "./binaries/client", "The interop client")
serverPath = flag.String("server", "./binaries/server", "The interop server")
)
func TestMain(m *testing.M) {
flag.Parse()
if _, err := os.Stat(*clientPath); os.IsNotExist(err) {
return
}
if _, err := os.Stat(*serverPath); os.IsNotExist(err) {
return
}
os.Exit(m.Run())
}
type testOpts struct {
testName string
backendCount int
clientFlags []string
}
func setup(t *testing.T, opts testOpts) (*controlPlane, *client, []*server) {
t.Helper()
backendCount := 1
if opts.backendCount != 0 {
backendCount = opts.backendCount
}
cp, err := newControlPlane(opts.testName)
if err != nil {
t.Fatalf("failed to start control-plane: %v", err)
}
t.Cleanup(cp.stop)
var clientLog bytes.Buffer
c, err := newClient(fmt.Sprintf("xds:///%s", opts.testName), *clientPath, cp.bootstrapContent, &clientLog, opts.clientFlags...)
if err != nil {
t.Fatalf("failed to start client: %v", err)
}
t.Cleanup(c.stop)
var serverLog bytes.Buffer
servers, err := newServers(opts.testName, *serverPath, cp.bootstrapContent, &serverLog, backendCount)
if err != nil {
t.Fatalf("failed to start server: %v", err)
}
t.Cleanup(func() {
for _, s := range servers {
s.stop()
}
})
t.Cleanup(func() {
// TODO: find a better way to print the log. They are long, and hide the failure.
t.Logf("\n----- client logs -----\n%v", clientLog.String())
t.Logf("\n----- server logs -----\n%v", serverLog.String())
})
return cp, c, servers
}
func TestPingPong(t *testing.T) {
const testName = "pingpong"
cp, c, _ := setup(t, testOpts{testName: testName})
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: testName,
NodeID: cp.nodeID,
Host: "localhost",
Port: serverPort,
SecLevel: e2e.SecurityLevelNone,
})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := cp.server.Update(ctx, resources); err != nil {
t.Fatalf("failed to update control plane resources: %v", err)
}
st, err := c.clientStats(ctx)
if err != nil {
t.Fatalf("failed to get client stats: %v", err)
}
if st.NumFailures != 0 {
t.Fatalf("Got %v failures: %+v", st.NumFailures, st)
}
}
// TestAffinity covers the affinity tests with ringhash policy.
// - client is configured to use ringhash, with 3 backends
// - all RPCs will hash a specific metadata header
// - verify that
// - all RPCs with the same metadata value are sent to the same backend
// - only one backend is Ready
// - send more RPCs with different metadata values until a new backend is picked, and verify that
// - only two backends are in Ready
func TestAffinity(t *testing.T) {
const (
testName = "affinity"
backendCount = 3
testMDKey = "xds_md"
testMDValue = "unary_yranu"
)
cp, c, servers := setup(t, testOpts{
testName: testName,
backendCount: backendCount,
clientFlags: []string{"--rpc=EmptyCall", fmt.Sprintf("--metadata=EmptyCall:%s:%s", testMDKey, testMDValue)},
})
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: testName,
NodeID: cp.nodeID,
Host: "localhost",
Port: serverPort,
SecLevel: e2e.SecurityLevelNone,
})
// Update EDS to multiple backends.
var ports []uint32
for _, s := range servers {
ports = append(ports, uint32(s.port))
}
edsMsg := resources.Endpoints[0]
resources.Endpoints[0] = e2e.DefaultEndpoint(
edsMsg.ClusterName,
"localhost",
ports,
)
// Update CDS lbpolicy to ringhash.
cdsMsg := resources.Clusters[0]
cdsMsg.LbPolicy = v3clusterpb.Cluster_RING_HASH
// Update RDS to hash the header.
rdsMsg := resources.Routes[0]
rdsMsg.VirtualHosts[0].Routes[0].Action = &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: cdsMsg.Name},
HashPolicy: []*v3routepb.RouteAction_HashPolicy{{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: testMDKey,
},
},
}},
}}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := cp.server.Update(ctx, resources); err != nil {
t.Fatalf("failed to update control plane resources: %v", err)
}
// Note: We can skip CSDS check because there's no long delay as in TD.
//
// The client stats check doesn't race with the xds resource update because
// there's only one version of xds resource, updated at the beginning of the
// test. So there's no need to retry the stats call.
//
// In the future, we may add tests that update xds in the middle. Then we
// either need to retry clientStats(), or make a CSDS check before so the
// result is stable.
st, err := c.clientStats(ctx)
if err != nil {
t.Fatalf("failed to get client stats: %v", err)
}
if st.NumFailures != 0 {
t.Fatalf("Got %v failures: %+v", st.NumFailures, st)
}
if len(st.RpcsByPeer) != 1 {
t.Fatalf("more than 1 backends got traffic: %v, want 1", st.RpcsByPeer)
}
// Call channelz to verify that only one subchannel is in state Ready.
scs, err := c.channelzSubChannels(ctx)
if err != nil {
t.Fatalf("failed to fetch channelz: %v", err)
}
verifySubConnStates(t, scs, map[channelzpb.ChannelConnectivityState_State]int{
channelzpb.ChannelConnectivityState_READY: 1,
channelzpb.ChannelConnectivityState_IDLE: 2,
})
// Send Unary call with different metadata value with integers starting from
// 0. Stop when a second peer is picked.
var (
diffPeerPicked bool
mdValue int
)
for !diffPeerPicked {
if err := c.configRPCs(ctx, &testpb.ClientConfigureRequest{
Types: []testpb.ClientConfigureRequest_RpcType{
testpb.ClientConfigureRequest_EMPTY_CALL,
testpb.ClientConfigureRequest_UNARY_CALL,
},
Metadata: []*testpb.ClientConfigureRequest_Metadata{
{Type: testpb.ClientConfigureRequest_EMPTY_CALL, Key: testMDKey, Value: testMDValue},
{Type: testpb.ClientConfigureRequest_UNARY_CALL, Key: testMDKey, Value: strconv.Itoa(mdValue)},
},
}); err != nil {
t.Fatalf("failed to configure RPC: %v", err)
}
st, err := c.clientStats(ctx)
if err != nil {
t.Fatalf("failed to get client stats: %v", err)
}
if st.NumFailures != 0 {
t.Fatalf("Got %v failures: %+v", st.NumFailures, st)
}
if len(st.RpcsByPeer) == 2 {
break
}
mdValue++
}
// Call channelz to verify that only one subchannel is in state Ready.
scs2, err := c.channelzSubChannels(ctx)
if err != nil {
t.Fatalf("failed to fetch channelz: %v", err)
}
verifySubConnStates(t, scs2, map[channelzpb.ChannelConnectivityState_State]int{
channelzpb.ChannelConnectivityState_READY: 2,
channelzpb.ChannelConnectivityState_IDLE: 1,
})
}

View File

@ -0,0 +1,36 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package e2e
import (
"testing"
"github.com/google/go-cmp/cmp"
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
)
func verifySubConnStates(t *testing.T, scs []*channelzpb.Subchannel, want map[channelzpb.ChannelConnectivityState_State]int) {
t.Helper()
var scStatsCount = map[channelzpb.ChannelConnectivityState_State]int{}
for _, sc := range scs {
scStatsCount[sc.Data.State.State]++
}
if diff := cmp.Diff(scStatsCount, want); diff != "" {
t.Fatalf("got unexpected number of subchannels in state Ready, %v, scs: %v", diff, scs)
}
}

6
xds/internal/test/e2e/run.sh Executable file
View File

@ -0,0 +1,6 @@
#!/bin/bash
mkdir binaries
go build -o ./binaries/client ../../../../interop/xds/client/
go build -o ./binaries/server ../../../../interop/xds/server/
go test . -v

View File

@ -93,7 +93,7 @@ func DefaultClientResources(params ResourceParams) UpdateOptions {
Listeners: []*v3listenerpb.Listener{DefaultClientListener(params.DialTarget, routeConfigName)},
Routes: []*v3routepb.RouteConfiguration{DefaultRouteConfig(routeConfigName, params.DialTarget, clusterName)},
Clusters: []*v3clusterpb.Cluster{DefaultCluster(clusterName, endpointsName, params.SecLevel)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{DefaultEndpoint(endpointsName, params.Host, params.Port)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})},
}
}
@ -358,21 +358,25 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel)
}
// DefaultEndpoint returns a basic xds Endpoint resource.
func DefaultEndpoint(clusterName string, host string, port uint32) *v3endpointpb.ClusterLoadAssignment {
func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpointpb.ClusterLoadAssignment {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, port := range ports {
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
})
}
return &v3endpointpb.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*v3endpointpb.LocalityLbEndpoints{{
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: []*v3endpointpb.LbEndpoint{{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: uint32(port)}},
}},
}},
}},
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
Priority: 0,
}},