mirror of https://github.com/grpc/grpc-go.git
487 lines
13 KiB
Go
487 lines
13 KiB
Go
/*
|
|
*
|
|
* Copyright 2019 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package balancer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"net"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
anypb "github.com/golang/protobuf/ptypes/any"
|
|
durationpb "github.com/golang/protobuf/ptypes/duration"
|
|
structpb "github.com/golang/protobuf/ptypes/struct"
|
|
wrpb "github.com/golang/protobuf/ptypes/wrappers"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/balancer"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/resolver"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/grpc/xds/internal"
|
|
cdspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/cds"
|
|
addresspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/address"
|
|
basepb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/base"
|
|
discoverypb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/discovery"
|
|
edspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/eds"
|
|
endpointpb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/endpoint/endpoint"
|
|
adsgrpc "google.golang.org/grpc/xds/internal/proto/envoy/service/discovery/v2/ads"
|
|
lrsgrpc "google.golang.org/grpc/xds/internal/proto/envoy/service/load_stats/v2/lrs"
|
|
)
|
|
|
|
var (
|
|
testServiceName = "test/foo"
|
|
testCDSReq = &discoverypb.DiscoveryRequest{
|
|
Node: &basepb.Node{
|
|
Metadata: &structpb.Struct{
|
|
Fields: map[string]*structpb.Value{
|
|
internal.GrpcHostname: {
|
|
Kind: &structpb.Value_StringValue{StringValue: testServiceName},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
TypeUrl: cdsType,
|
|
}
|
|
testEDSReq = &discoverypb.DiscoveryRequest{
|
|
Node: &basepb.Node{
|
|
Metadata: &structpb.Struct{
|
|
Fields: map[string]*structpb.Value{
|
|
internal.GrpcHostname: {
|
|
Kind: &structpb.Value_StringValue{StringValue: testServiceName},
|
|
},
|
|
endpointRequired: {
|
|
Kind: &structpb.Value_BoolValue{BoolValue: true},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
TypeUrl: edsType,
|
|
}
|
|
testEDSReqWithoutEndpoints = &discoverypb.DiscoveryRequest{
|
|
Node: &basepb.Node{
|
|
Metadata: &structpb.Struct{
|
|
Fields: map[string]*structpb.Value{
|
|
internal.GrpcHostname: {
|
|
Kind: &structpb.Value_StringValue{StringValue: testServiceName},
|
|
},
|
|
endpointRequired: {
|
|
Kind: &structpb.Value_BoolValue{BoolValue: false},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
TypeUrl: edsType,
|
|
}
|
|
testCluster = &cdspb.Cluster{
|
|
Name: testServiceName,
|
|
ClusterDiscoveryType: &cdspb.Cluster_Type{Type: cdspb.Cluster_EDS},
|
|
LbPolicy: cdspb.Cluster_ROUND_ROBIN,
|
|
}
|
|
marshaledCluster, _ = proto.Marshal(testCluster)
|
|
testCDSResp = &discoverypb.DiscoveryResponse{
|
|
Resources: []*anypb.Any{
|
|
{
|
|
TypeUrl: cdsType,
|
|
Value: marshaledCluster,
|
|
},
|
|
},
|
|
TypeUrl: cdsType,
|
|
}
|
|
testClusterLoadAssignment = &edspb.ClusterLoadAssignment{
|
|
ClusterName: testServiceName,
|
|
Endpoints: []*endpointpb.LocalityLbEndpoints{
|
|
{
|
|
Locality: &basepb.Locality{
|
|
Region: "asia-east1",
|
|
Zone: "1",
|
|
SubZone: "sa",
|
|
},
|
|
LbEndpoints: []*endpointpb.LbEndpoint{
|
|
{
|
|
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
|
|
Endpoint: &endpointpb.Endpoint{
|
|
Address: &addresspb.Address{
|
|
Address: &addresspb.Address_SocketAddress{
|
|
SocketAddress: &addresspb.SocketAddress{
|
|
Address: "1.1.1.1",
|
|
PortSpecifier: &addresspb.SocketAddress_PortValue{
|
|
PortValue: 10001,
|
|
},
|
|
ResolverName: "dns",
|
|
},
|
|
},
|
|
},
|
|
HealthCheckConfig: nil,
|
|
},
|
|
},
|
|
Metadata: &basepb.Metadata{
|
|
FilterMetadata: map[string]*structpb.Struct{
|
|
"xx.lb": {
|
|
Fields: map[string]*structpb.Value{
|
|
"endpoint_name": {
|
|
Kind: &structpb.Value_StringValue{
|
|
StringValue: "some.endpoint.name",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
LoadBalancingWeight: &wrpb.UInt32Value{
|
|
Value: 1,
|
|
},
|
|
Priority: 0,
|
|
},
|
|
},
|
|
}
|
|
marshaledClusterLoadAssignment, _ = proto.Marshal(testClusterLoadAssignment)
|
|
testEDSResp = &discoverypb.DiscoveryResponse{
|
|
Resources: []*anypb.Any{
|
|
{
|
|
TypeUrl: edsType,
|
|
Value: marshaledClusterLoadAssignment,
|
|
},
|
|
},
|
|
TypeUrl: edsType,
|
|
}
|
|
testClusterLoadAssignmentWithoutEndpoints = &edspb.ClusterLoadAssignment{
|
|
ClusterName: testServiceName,
|
|
Endpoints: []*endpointpb.LocalityLbEndpoints{
|
|
{
|
|
Locality: &basepb.Locality{
|
|
SubZone: "sa",
|
|
},
|
|
LoadBalancingWeight: &wrpb.UInt32Value{
|
|
Value: 128,
|
|
},
|
|
Priority: 0,
|
|
},
|
|
},
|
|
Policy: nil,
|
|
}
|
|
marshaledClusterLoadAssignmentWithoutEndpoints, _ = proto.Marshal(testClusterLoadAssignmentWithoutEndpoints)
|
|
testEDSRespWithoutEndpoints = &discoverypb.DiscoveryResponse{
|
|
Resources: []*anypb.Any{
|
|
{
|
|
TypeUrl: edsType,
|
|
Value: marshaledClusterLoadAssignmentWithoutEndpoints,
|
|
},
|
|
},
|
|
TypeUrl: edsType,
|
|
}
|
|
)
|
|
|
|
type testTrafficDirector struct {
|
|
reqChan chan *request
|
|
respChan chan *response
|
|
}
|
|
|
|
type request struct {
|
|
req *discoverypb.DiscoveryRequest
|
|
err error
|
|
}
|
|
|
|
type response struct {
|
|
resp *discoverypb.DiscoveryResponse
|
|
err error
|
|
}
|
|
|
|
func (ttd *testTrafficDirector) StreamAggregatedResources(s adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
|
|
for {
|
|
req, err := s.Recv()
|
|
if err != nil {
|
|
ttd.reqChan <- &request{
|
|
req: nil,
|
|
err: err,
|
|
}
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
ttd.reqChan <- &request{
|
|
req: req,
|
|
err: nil,
|
|
}
|
|
if req.TypeUrl == edsType {
|
|
break
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case resp := <-ttd.respChan:
|
|
if resp.err != nil {
|
|
return resp.err
|
|
}
|
|
if err := s.Send(resp.resp); err != nil {
|
|
return err
|
|
}
|
|
case <-s.Context().Done():
|
|
return s.Context().Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ttd *testTrafficDirector) DeltaAggregatedResources(adsgrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
|
|
return status.Error(codes.Unimplemented, "")
|
|
}
|
|
|
|
func (ttd *testTrafficDirector) sendResp(resp *response) {
|
|
ttd.respChan <- resp
|
|
}
|
|
|
|
func (ttd *testTrafficDirector) getReq() *request {
|
|
return <-ttd.reqChan
|
|
}
|
|
|
|
func newTestTrafficDirector() *testTrafficDirector {
|
|
return &testTrafficDirector{
|
|
reqChan: make(chan *request, 10),
|
|
respChan: make(chan *response, 10),
|
|
}
|
|
}
|
|
|
|
type testConfig struct {
|
|
doCDS bool
|
|
expectedRequests []*discoverypb.DiscoveryRequest
|
|
responsesToSend []*discoverypb.DiscoveryResponse
|
|
expectedADSResponses []proto.Message
|
|
adsErr error
|
|
svrErr error
|
|
}
|
|
|
|
func setupServer(t *testing.T) (addr string, td *testTrafficDirector, lrss *lrsServer, cleanup func()) {
|
|
lis, err := net.Listen("tcp", "localhost:0")
|
|
if err != nil {
|
|
t.Fatalf("listen failed due to: %v", err)
|
|
}
|
|
svr := grpc.NewServer()
|
|
td = newTestTrafficDirector()
|
|
lrss = &lrsServer{
|
|
drops: make(map[string]uint64),
|
|
reportingInterval: &durationpb.Duration{
|
|
Seconds: 60 * 60, // 1 hour, each test can override this to a shorter duration.
|
|
Nanos: 0,
|
|
},
|
|
}
|
|
adsgrpc.RegisterAggregatedDiscoveryServiceServer(svr, td)
|
|
lrsgrpc.RegisterLoadReportingServiceServer(svr, lrss)
|
|
go svr.Serve(lis)
|
|
return lis.Addr().String(), td, lrss, func() {
|
|
svr.Stop()
|
|
lis.Close()
|
|
}
|
|
}
|
|
|
|
func (s) TestXdsClientResponseHandling(t *testing.T) {
|
|
for _, test := range []*testConfig{
|
|
{
|
|
doCDS: true,
|
|
expectedRequests: []*discoverypb.DiscoveryRequest{testCDSReq, testEDSReq},
|
|
responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp, testEDSResp},
|
|
expectedADSResponses: []proto.Message{testCluster, testClusterLoadAssignment},
|
|
},
|
|
{
|
|
doCDS: false,
|
|
expectedRequests: []*discoverypb.DiscoveryRequest{testEDSReqWithoutEndpoints},
|
|
responsesToSend: []*discoverypb.DiscoveryResponse{testEDSRespWithoutEndpoints},
|
|
expectedADSResponses: []proto.Message{testClusterLoadAssignmentWithoutEndpoints},
|
|
},
|
|
} {
|
|
testXdsClientResponseHandling(t, test)
|
|
}
|
|
}
|
|
|
|
func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
|
|
addr, td, _, cleanup := setupServer(t)
|
|
defer cleanup()
|
|
adsChan := make(chan proto.Message, 10)
|
|
newADS := func(ctx context.Context, i proto.Message) error {
|
|
adsChan <- i
|
|
return nil
|
|
}
|
|
client := newXDSClient(addr, test.doCDS, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, func(context.Context) {}, func() {})
|
|
defer client.close()
|
|
go client.run()
|
|
|
|
for _, expectedReq := range test.expectedRequests {
|
|
req := td.getReq()
|
|
if req.err != nil {
|
|
t.Fatalf("ads RPC failed with err: %v", req.err)
|
|
}
|
|
if !proto.Equal(req.req, expectedReq) {
|
|
t.Fatalf("got ADS request %T %v, expected: %T %v", req.req, req.req, expectedReq, expectedReq)
|
|
}
|
|
}
|
|
|
|
for i, resp := range test.responsesToSend {
|
|
td.sendResp(&response{resp: resp})
|
|
ads := <-adsChan
|
|
if !proto.Equal(ads, test.expectedADSResponses[i]) {
|
|
t.Fatalf("received unexpected ads response, got %v, want %v", ads, test.expectedADSResponses[i])
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s) TestXdsClientLoseContact(t *testing.T) {
|
|
for _, test := range []*testConfig{
|
|
{
|
|
doCDS: true,
|
|
responsesToSend: []*discoverypb.DiscoveryResponse{},
|
|
},
|
|
{
|
|
doCDS: false,
|
|
responsesToSend: []*discoverypb.DiscoveryResponse{testEDSRespWithoutEndpoints},
|
|
},
|
|
} {
|
|
testXdsClientLoseContactRemoteClose(t, test)
|
|
}
|
|
|
|
for _, test := range []*testConfig{
|
|
{
|
|
doCDS: false,
|
|
responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp}, // CDS response when in custom mode.
|
|
},
|
|
{
|
|
doCDS: true,
|
|
responsesToSend: []*discoverypb.DiscoveryResponse{{}}, // response with 0 resources is an error case.
|
|
},
|
|
{
|
|
doCDS: true,
|
|
responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp},
|
|
adsErr: errors.New("some ads parsing error from xdsBalancer"),
|
|
},
|
|
} {
|
|
testXdsClientLoseContactADSRelatedErrorOccur(t, test)
|
|
}
|
|
}
|
|
|
|
func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) {
|
|
addr, td, _, cleanup := setupServer(t)
|
|
defer cleanup()
|
|
adsChan := make(chan proto.Message, 10)
|
|
newADS := func(ctx context.Context, i proto.Message) error {
|
|
adsChan <- i
|
|
return nil
|
|
}
|
|
contactChan := make(chan *loseContact, 10)
|
|
loseContactFunc := func(context.Context) {
|
|
contactChan <- &loseContact{}
|
|
}
|
|
client := newXDSClient(addr, test.doCDS, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
|
|
defer client.close()
|
|
go client.run()
|
|
|
|
// make sure server side get the request (i.e stream created successfully on client side)
|
|
td.getReq()
|
|
|
|
for _, resp := range test.responsesToSend {
|
|
td.sendResp(&response{resp: resp})
|
|
// make sure client side receives it
|
|
<-adsChan
|
|
}
|
|
cleanup()
|
|
|
|
select {
|
|
case <-contactChan:
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("time out when expecting lost contact signal")
|
|
}
|
|
}
|
|
|
|
func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig) {
|
|
addr, td, _, cleanup := setupServer(t)
|
|
defer cleanup()
|
|
|
|
adsChan := make(chan proto.Message, 10)
|
|
newADS := func(ctx context.Context, i proto.Message) error {
|
|
adsChan <- i
|
|
return test.adsErr
|
|
}
|
|
contactChan := make(chan *loseContact, 10)
|
|
loseContactFunc := func(context.Context) {
|
|
contactChan <- &loseContact{}
|
|
}
|
|
client := newXDSClient(addr, test.doCDS, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
|
|
defer client.close()
|
|
go client.run()
|
|
|
|
// make sure server side get the request (i.e stream created successfully on client side)
|
|
td.getReq()
|
|
|
|
for _, resp := range test.responsesToSend {
|
|
td.sendResp(&response{resp: resp})
|
|
}
|
|
|
|
select {
|
|
case <-contactChan:
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("time out when expecting lost contact signal")
|
|
}
|
|
}
|
|
|
|
func (s) TestXdsClientExponentialRetry(t *testing.T) {
|
|
cfg := &testConfig{
|
|
svrErr: status.Errorf(codes.Aborted, "abort the stream to trigger retry"),
|
|
}
|
|
addr, td, _, cleanup := setupServer(t)
|
|
defer cleanup()
|
|
|
|
adsChan := make(chan proto.Message, 10)
|
|
newADS := func(ctx context.Context, i proto.Message) error {
|
|
adsChan <- i
|
|
return nil
|
|
}
|
|
contactChan := make(chan *loseContact, 10)
|
|
loseContactFunc := func(context.Context) {
|
|
contactChan <- &loseContact{}
|
|
}
|
|
client := newXDSClient(addr, cfg.doCDS, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
|
|
defer client.close()
|
|
go client.run()
|
|
|
|
var secondRetry, thirdRetry time.Time
|
|
for i := 0; i < 3; i++ {
|
|
// make sure server side get the request (i.e stream created successfully on client side)
|
|
td.getReq()
|
|
td.sendResp(&response{err: cfg.svrErr})
|
|
|
|
select {
|
|
case <-contactChan:
|
|
if i == 1 {
|
|
secondRetry = time.Now()
|
|
}
|
|
if i == 2 {
|
|
thirdRetry = time.Now()
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("time out when expecting lost contact signal")
|
|
}
|
|
}
|
|
if thirdRetry.Sub(secondRetry) < 1*time.Second {
|
|
t.Fatalf("interval between second and third retry is %v, expected > 1s", thirdRetry.Sub(secondRetry))
|
|
}
|
|
}
|