diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 37544060e..b687fdb3d 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -114,3 +114,4 @@ jobs: examples/examples_test.sh security/advancedtls/examples/examples_test.sh interop/interop_test.sh + xds/internal/test/e2e/run.sh diff --git a/interop/xds/server/server.go b/interop/xds/server/server.go index 2f33799f9..afbbc56af 100644 --- a/interop/xds/server/server.go +++ b/interop/xds/server/server.go @@ -43,15 +43,19 @@ import ( ) var ( - port = flag.Int("port", 8080, "Listening port for test service") - maintenancePort = flag.Int("maintenance_port", 8081, "Listening port for maintenance services like health, reflection, channelz etc when -secure_mode is true. When -secure_mode is false, all these services will be registered on -port") - serverID = flag.String("server_id", "go_server", "Server ID included in response") - secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.") + port = flag.Int("port", 8080, "Listening port for test service") + maintenancePort = flag.Int("maintenance_port", 8081, "Listening port for maintenance services like health, reflection, channelz etc when -secure_mode is true. When -secure_mode is false, all these services will be registered on -port") + serverID = flag.String("server_id", "go_server", "Server ID included in response") + secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.") + hostNameOverride = flag.String("host_name_override", "", "If set, use this as the hostname instead of the real hostname") logger = grpclog.Component("interop") ) func getHostname() string { + if *hostNameOverride != "" { + return *hostNameOverride + } hostname, err := os.Hostname() if err != nil { log.Fatalf("failed to get hostname: %v", err) @@ -64,6 +68,7 @@ func getHostname() string { type testServiceImpl struct { testgrpc.UnimplementedTestServiceServer hostname string + serverID string } func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { @@ -73,7 +78,7 @@ func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*test func (s *testServiceImpl) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname)) - return &testpb.SimpleResponse{ServerId: *serverID, Hostname: s.hostname}, nil + return &testpb.SimpleResponse{ServerId: s.serverID, Hostname: s.hostname}, nil } // xdsUpdateHealthServiceImpl provides an implementation of the @@ -108,7 +113,7 @@ func main() { logger.Fatal("-port and -maintenance_port must be different when -secure_mode is set") } - testService := &testServiceImpl{hostname: getHostname()} + testService := &testServiceImpl{hostname: getHostname(), serverID: *serverID} healthServer := health.NewServer() updateHealthService := &xdsUpdateHealthServiceImpl{healthServer: healthServer} diff --git a/xds/csds/csds_test.go b/xds/csds/csds_test.go index da7144f7e..9de83d37f 100644 --- a/xds/csds/csds_test.go +++ b/xds/csds/csds_test.go @@ -153,7 +153,7 @@ func init() { clusterAnys[i] = testutils.MarshalAny(clusters[i]) } for i := range edsTargets { - endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i]) + endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i:i+1]) endpointAnys[i] = testutils.MarshalAny(endpoints[i]) } } diff --git a/xds/internal/test/e2e/README.md b/xds/internal/test/e2e/README.md new file mode 100644 index 000000000..33cffa0da --- /dev/null +++ b/xds/internal/test/e2e/README.md @@ -0,0 +1,19 @@ +Build client and server binaries. + +```sh +go build -o ./binaries/client ../../../../interop/xds/client/ +go build -o ./binaries/server ../../../../interop/xds/server/ +``` + +Run the test + +```sh +go test . -v +``` + +The client/server paths are flags + +```sh +go test . -v -client=$HOME/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-client +``` +Note that grpc logs are only turned on for Go. diff --git a/xds/internal/test/e2e/controlplane.go b/xds/internal/test/e2e/controlplane.go new file mode 100644 index 000000000..247991b83 --- /dev/null +++ b/xds/internal/test/e2e/controlplane.go @@ -0,0 +1,62 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e + +import ( + "fmt" + + "github.com/google/uuid" + xdsinternal "google.golang.org/grpc/internal/xds" + "google.golang.org/grpc/xds/internal/testutils/e2e" +) + +type controlPlane struct { + server *e2e.ManagementServer + nodeID string + bootstrapContent string +} + +func newControlPlane(testName string) (*controlPlane, error) { + // Spin up an xDS management server on a local port. + server, err := e2e.StartManagementServer() + if err != nil { + return nil, fmt.Errorf("failed to spin up the xDS management server: %v", err) + } + + nodeID := uuid.New().String() + bootstrapContentBytes, err := xdsinternal.BootstrapContents(xdsinternal.BootstrapOptions{ + Version: xdsinternal.TransportV3, + NodeID: nodeID, + ServerURI: server.Address, + ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate, + }) + if err != nil { + server.Stop() + return nil, fmt.Errorf("failed to create bootstrap file: %v", err) + } + + return &controlPlane{ + server: server, + nodeID: nodeID, + bootstrapContent: string(bootstrapContentBytes), + }, nil +} + +func (cp *controlPlane) stop() { + cp.server.Stop() +} diff --git a/xds/internal/test/e2e/e2e.go b/xds/internal/test/e2e/e2e.go new file mode 100644 index 000000000..82c7f9dfb --- /dev/null +++ b/xds/internal/test/e2e/e2e.go @@ -0,0 +1,176 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package e2e implements xds e2e tests using go-control-plane. +package e2e + +import ( + "context" + "fmt" + "io" + "os" + "os/exec" + + "google.golang.org/grpc" + channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" + testpb "google.golang.org/grpc/interop/grpc_testing" +) + +func cmd(path string, logger io.Writer, args []string, env []string) (*exec.Cmd, error) { + cmd := exec.Command(path, args...) + cmd.Env = append(os.Environ(), env...) + cmd.Stdout = logger + cmd.Stderr = logger + return cmd, nil +} + +const ( + clientStatsPort = 60363 // TODO: make this different per-test, only needed for parallel tests. +) + +type client struct { + cmd *exec.Cmd + + target string + statsCC *grpc.ClientConn +} + +// newClient create a client with the given target and bootstrap content. +func newClient(target, binaryPath, bootstrap string, logger io.Writer, flags ...string) (*client, error) { + cmd, err := cmd( + binaryPath, + logger, + append([]string{ + "--server=" + target, + "--print_response=true", + "--qps=100", + fmt.Sprintf("--stats_port=%d", clientStatsPort), + }, flags...), // Append any flags from caller. + []string{ + "GRPC_GO_LOG_VERBOSITY_LEVEL=99", + "GRPC_GO_LOG_SEVERITY_LEVEL=info", + "GRPC_XDS_BOOTSTRAP_CONFIG=" + bootstrap, // The bootstrap content doesn't need to be quoted. + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to run client cmd: %v", err) + } + cmd.Start() + + cc, err := grpc.Dial(fmt.Sprintf("localhost:%d", clientStatsPort), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.WaitForReady(true))) + if err != nil { + return nil, err + } + return &client{ + cmd: cmd, + target: target, + statsCC: cc, + }, nil +} + +func (c *client) clientStats(ctx context.Context) (*testpb.LoadBalancerStatsResponse, error) { + ccc := testpb.NewLoadBalancerStatsServiceClient(c.statsCC) + return ccc.GetClientStats(ctx, &testpb.LoadBalancerStatsRequest{ + NumRpcs: 100, + TimeoutSec: 10, + }) +} + +func (c *client) configRPCs(ctx context.Context, req *testpb.ClientConfigureRequest) error { + ccc := testpb.NewXdsUpdateClientConfigureServiceClient(c.statsCC) + _, err := ccc.Configure(ctx, req) + return err +} + +func (c *client) channelzSubChannels(ctx context.Context) ([]*channelzpb.Subchannel, error) { + ccc := channelzpb.NewChannelzClient(c.statsCC) + r, err := ccc.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{}) + if err != nil { + return nil, err + } + + var ret []*channelzpb.Subchannel + for _, cc := range r.Channel { + if cc.Data.Target != c.target { + continue + } + for _, sc := range cc.SubchannelRef { + rr, err := ccc.GetSubchannel(ctx, &channelzpb.GetSubchannelRequest{SubchannelId: sc.SubchannelId}) + if err != nil { + return nil, err + } + ret = append(ret, rr.Subchannel) + } + } + return ret, nil +} + +func (c *client) stop() { + c.cmd.Process.Kill() + c.cmd.Wait() +} + +const ( + serverPort = 50051 // TODO: make this different per-test, only needed for parallel tests. +) + +type server struct { + cmd *exec.Cmd + port int +} + +// newServer creates multiple servers with the given bootstrap content. +// +// Each server gets a different hostname, in the format of +// -. +func newServers(hostnamePrefix, binaryPath, bootstrap string, logger io.Writer, count int) (_ []*server, err error) { + var ret []*server + defer func() { + if err != nil { + for _, s := range ret { + s.stop() + } + } + }() + for i := 0; i < count; i++ { + port := serverPort + i + cmd, err := cmd( + binaryPath, + logger, + []string{ + fmt.Sprintf("--port=%d", port), + fmt.Sprintf("--host_name_override=%s-%d", hostnamePrefix, i), + }, + []string{ + "GRPC_GO_LOG_VERBOSITY_LEVEL=99", + "GRPC_GO_LOG_SEVERITY_LEVEL=info", + "GRPC_XDS_BOOTSTRAP_CONFIG=" + bootstrap, // The bootstrap content doesn't need to be quoted., + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to run server cmd: %v", err) + } + cmd.Start() + ret = append(ret, &server{cmd: cmd, port: port}) + } + return ret, nil +} + +func (s *server) stop() { + s.cmd.Process.Kill() + s.cmd.Wait() +} diff --git a/xds/internal/test/e2e/e2e_test.go b/xds/internal/test/e2e/e2e_test.go new file mode 100644 index 000000000..5c116b478 --- /dev/null +++ b/xds/internal/test/e2e/e2e_test.go @@ -0,0 +1,262 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e + +import ( + "bytes" + "context" + "flag" + "fmt" + "os" + "strconv" + "testing" + "time" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" + testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/xds/internal/testutils/e2e" +) + +var ( + clientPath = flag.String("client", "./binaries/client", "The interop client") + serverPath = flag.String("server", "./binaries/server", "The interop server") +) + +func TestMain(m *testing.M) { + flag.Parse() + if _, err := os.Stat(*clientPath); os.IsNotExist(err) { + return + } + if _, err := os.Stat(*serverPath); os.IsNotExist(err) { + return + } + os.Exit(m.Run()) +} + +type testOpts struct { + testName string + backendCount int + clientFlags []string +} + +func setup(t *testing.T, opts testOpts) (*controlPlane, *client, []*server) { + t.Helper() + backendCount := 1 + if opts.backendCount != 0 { + backendCount = opts.backendCount + } + + cp, err := newControlPlane(opts.testName) + if err != nil { + t.Fatalf("failed to start control-plane: %v", err) + } + t.Cleanup(cp.stop) + + var clientLog bytes.Buffer + c, err := newClient(fmt.Sprintf("xds:///%s", opts.testName), *clientPath, cp.bootstrapContent, &clientLog, opts.clientFlags...) + if err != nil { + t.Fatalf("failed to start client: %v", err) + } + t.Cleanup(c.stop) + + var serverLog bytes.Buffer + servers, err := newServers(opts.testName, *serverPath, cp.bootstrapContent, &serverLog, backendCount) + if err != nil { + t.Fatalf("failed to start server: %v", err) + } + t.Cleanup(func() { + for _, s := range servers { + s.stop() + } + }) + t.Cleanup(func() { + // TODO: find a better way to print the log. They are long, and hide the failure. + t.Logf("\n----- client logs -----\n%v", clientLog.String()) + t.Logf("\n----- server logs -----\n%v", serverLog.String()) + }) + return cp, c, servers +} + +func TestPingPong(t *testing.T) { + const testName = "pingpong" + cp, c, _ := setup(t, testOpts{testName: testName}) + + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: testName, + NodeID: cp.nodeID, + Host: "localhost", + Port: serverPort, + SecLevel: e2e.SecurityLevelNone, + }) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := cp.server.Update(ctx, resources); err != nil { + t.Fatalf("failed to update control plane resources: %v", err) + } + + st, err := c.clientStats(ctx) + if err != nil { + t.Fatalf("failed to get client stats: %v", err) + } + if st.NumFailures != 0 { + t.Fatalf("Got %v failures: %+v", st.NumFailures, st) + } +} + +// TestAffinity covers the affinity tests with ringhash policy. +// - client is configured to use ringhash, with 3 backends +// - all RPCs will hash a specific metadata header +// - verify that +// - all RPCs with the same metadata value are sent to the same backend +// - only one backend is Ready +// - send more RPCs with different metadata values until a new backend is picked, and verify that +// - only two backends are in Ready +func TestAffinity(t *testing.T) { + const ( + testName = "affinity" + backendCount = 3 + testMDKey = "xds_md" + testMDValue = "unary_yranu" + ) + cp, c, servers := setup(t, testOpts{ + testName: testName, + backendCount: backendCount, + clientFlags: []string{"--rpc=EmptyCall", fmt.Sprintf("--metadata=EmptyCall:%s:%s", testMDKey, testMDValue)}, + }) + + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: testName, + NodeID: cp.nodeID, + Host: "localhost", + Port: serverPort, + SecLevel: e2e.SecurityLevelNone, + }) + + // Update EDS to multiple backends. + var ports []uint32 + for _, s := range servers { + ports = append(ports, uint32(s.port)) + } + edsMsg := resources.Endpoints[0] + resources.Endpoints[0] = e2e.DefaultEndpoint( + edsMsg.ClusterName, + "localhost", + ports, + ) + + // Update CDS lbpolicy to ringhash. + cdsMsg := resources.Clusters[0] + cdsMsg.LbPolicy = v3clusterpb.Cluster_RING_HASH + + // Update RDS to hash the header. + rdsMsg := resources.Routes[0] + rdsMsg.VirtualHosts[0].Routes[0].Action = &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: cdsMsg.Name}, + HashPolicy: []*v3routepb.RouteAction_HashPolicy{{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: testMDKey, + }, + }, + }}, + }} + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := cp.server.Update(ctx, resources); err != nil { + t.Fatalf("failed to update control plane resources: %v", err) + } + + // Note: We can skip CSDS check because there's no long delay as in TD. + // + // The client stats check doesn't race with the xds resource update because + // there's only one version of xds resource, updated at the beginning of the + // test. So there's no need to retry the stats call. + // + // In the future, we may add tests that update xds in the middle. Then we + // either need to retry clientStats(), or make a CSDS check before so the + // result is stable. + + st, err := c.clientStats(ctx) + if err != nil { + t.Fatalf("failed to get client stats: %v", err) + } + if st.NumFailures != 0 { + t.Fatalf("Got %v failures: %+v", st.NumFailures, st) + } + if len(st.RpcsByPeer) != 1 { + t.Fatalf("more than 1 backends got traffic: %v, want 1", st.RpcsByPeer) + } + + // Call channelz to verify that only one subchannel is in state Ready. + scs, err := c.channelzSubChannels(ctx) + if err != nil { + t.Fatalf("failed to fetch channelz: %v", err) + } + verifySubConnStates(t, scs, map[channelzpb.ChannelConnectivityState_State]int{ + channelzpb.ChannelConnectivityState_READY: 1, + channelzpb.ChannelConnectivityState_IDLE: 2, + }) + + // Send Unary call with different metadata value with integers starting from + // 0. Stop when a second peer is picked. + var ( + diffPeerPicked bool + mdValue int + ) + for !diffPeerPicked { + if err := c.configRPCs(ctx, &testpb.ClientConfigureRequest{ + Types: []testpb.ClientConfigureRequest_RpcType{ + testpb.ClientConfigureRequest_EMPTY_CALL, + testpb.ClientConfigureRequest_UNARY_CALL, + }, + Metadata: []*testpb.ClientConfigureRequest_Metadata{ + {Type: testpb.ClientConfigureRequest_EMPTY_CALL, Key: testMDKey, Value: testMDValue}, + {Type: testpb.ClientConfigureRequest_UNARY_CALL, Key: testMDKey, Value: strconv.Itoa(mdValue)}, + }, + }); err != nil { + t.Fatalf("failed to configure RPC: %v", err) + } + + st, err := c.clientStats(ctx) + if err != nil { + t.Fatalf("failed to get client stats: %v", err) + } + if st.NumFailures != 0 { + t.Fatalf("Got %v failures: %+v", st.NumFailures, st) + } + if len(st.RpcsByPeer) == 2 { + break + } + + mdValue++ + } + + // Call channelz to verify that only one subchannel is in state Ready. + scs2, err := c.channelzSubChannels(ctx) + if err != nil { + t.Fatalf("failed to fetch channelz: %v", err) + } + verifySubConnStates(t, scs2, map[channelzpb.ChannelConnectivityState_State]int{ + channelzpb.ChannelConnectivityState_READY: 2, + channelzpb.ChannelConnectivityState_IDLE: 1, + }) +} diff --git a/xds/internal/test/e2e/e2e_utils.go b/xds/internal/test/e2e/e2e_utils.go new file mode 100644 index 000000000..34b0ee9eb --- /dev/null +++ b/xds/internal/test/e2e/e2e_utils.go @@ -0,0 +1,36 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" +) + +func verifySubConnStates(t *testing.T, scs []*channelzpb.Subchannel, want map[channelzpb.ChannelConnectivityState_State]int) { + t.Helper() + var scStatsCount = map[channelzpb.ChannelConnectivityState_State]int{} + for _, sc := range scs { + scStatsCount[sc.Data.State.State]++ + } + if diff := cmp.Diff(scStatsCount, want); diff != "" { + t.Fatalf("got unexpected number of subchannels in state Ready, %v, scs: %v", diff, scs) + } +} diff --git a/xds/internal/test/e2e/run.sh b/xds/internal/test/e2e/run.sh new file mode 100755 index 000000000..4363d6cbd --- /dev/null +++ b/xds/internal/test/e2e/run.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +mkdir binaries +go build -o ./binaries/client ../../../../interop/xds/client/ +go build -o ./binaries/server ../../../../interop/xds/server/ +go test . -v diff --git a/xds/internal/testutils/e2e/clientresources.go b/xds/internal/testutils/e2e/clientresources.go index daae1aea8..f3f7f6307 100644 --- a/xds/internal/testutils/e2e/clientresources.go +++ b/xds/internal/testutils/e2e/clientresources.go @@ -93,7 +93,7 @@ func DefaultClientResources(params ResourceParams) UpdateOptions { Listeners: []*v3listenerpb.Listener{DefaultClientListener(params.DialTarget, routeConfigName)}, Routes: []*v3routepb.RouteConfiguration{DefaultRouteConfig(routeConfigName, params.DialTarget, clusterName)}, Clusters: []*v3clusterpb.Cluster{DefaultCluster(clusterName, endpointsName, params.SecLevel)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{DefaultEndpoint(endpointsName, params.Host, params.Port)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})}, } } @@ -358,21 +358,25 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel) } // DefaultEndpoint returns a basic xds Endpoint resource. -func DefaultEndpoint(clusterName string, host string, port uint32) *v3endpointpb.ClusterLoadAssignment { +func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpointpb.ClusterLoadAssignment { + var lbEndpoints []*v3endpointpb.LbEndpoint + for _, port := range ports { + lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{ + HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{ + Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{ + SocketAddress: &v3corepb.SocketAddress{ + Protocol: v3corepb.SocketAddress_TCP, + Address: host, + PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}}, + }}, + }}, + }) + } return &v3endpointpb.ClusterLoadAssignment{ ClusterName: clusterName, Endpoints: []*v3endpointpb.LocalityLbEndpoints{{ - Locality: &v3corepb.Locality{SubZone: "subzone"}, - LbEndpoints: []*v3endpointpb.LbEndpoint{{ - HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{ - Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{ - SocketAddress: &v3corepb.SocketAddress{ - Protocol: v3corepb.SocketAddress_TCP, - Address: host, - PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: uint32(port)}}, - }}, - }}, - }}, + Locality: &v3corepb.Locality{SubZone: "subzone"}, + LbEndpoints: lbEndpoints, LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1}, Priority: 0, }},