mirror of https://github.com/grpc/grpc-go.git
cdsbalancer: test cleanup part 2/N (#6554)
This commit is contained in:
parent
7f66074c37
commit
8a2c220594
|
|
@ -43,12 +43,26 @@ func init() {
|
|||
// the Attributes field of resolver.Address.
|
||||
type handshakeAttrKey struct{}
|
||||
|
||||
// Equal reports whether the handshake info structs are identical (have the
|
||||
// same pointer). This is sufficient as all subconns from one CDS balancer use
|
||||
// the same one.
|
||||
func (hi *HandshakeInfo) Equal(o any) bool {
|
||||
oh, ok := o.(*HandshakeInfo)
|
||||
return ok && oh == hi
|
||||
// Equal reports whether the handshake info structs are identical.
|
||||
func (hi *HandshakeInfo) Equal(other *HandshakeInfo) bool {
|
||||
if hi == nil && other == nil {
|
||||
return true
|
||||
}
|
||||
if hi == nil || other == nil {
|
||||
return false
|
||||
}
|
||||
if hi.rootProvider != other.rootProvider ||
|
||||
hi.identityProvider != other.identityProvider ||
|
||||
hi.requireClientCert != other.requireClientCert ||
|
||||
len(hi.sanMatchers) != len(other.sanMatchers) {
|
||||
return false
|
||||
}
|
||||
for i := range hi.sanMatchers {
|
||||
if !hi.sanMatchers[i].Equal(other.sanMatchers[i]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// SetHandshakeInfo returns a copy of addr in which the Attributes field is
|
||||
|
|
|
|||
|
|
@ -216,7 +216,7 @@ func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
|
|||
// StartTestService spins up a stub server exposing the TestService on a local
|
||||
// port. If the passed in server is nil, a stub server that implements only the
|
||||
// EmptyCall and UnaryCall RPCs is started.
|
||||
func StartTestService(t *testing.T, server *StubServer) *StubServer {
|
||||
func StartTestService(t *testing.T, server *StubServer, sopts ...grpc.ServerOption) *StubServer {
|
||||
if server == nil {
|
||||
server = &StubServer{
|
||||
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
|
||||
|
|
@ -225,7 +225,7 @@ func StartTestService(t *testing.T, server *StubServer) *StubServer {
|
|||
},
|
||||
}
|
||||
}
|
||||
server.StartServer()
|
||||
server.StartServer(sopts...)
|
||||
|
||||
t.Logf("Started test service backend at %q", server.Address)
|
||||
return server
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -26,7 +26,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
|
@ -35,7 +34,6 @@ import (
|
|||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/balancer/stub"
|
||||
"google.golang.org/grpc/internal/grpctest"
|
||||
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
|
||||
"google.golang.org/grpc/internal/stubserver"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/internal/testutils/xds/e2e"
|
||||
|
|
@ -44,9 +42,6 @@ import (
|
|||
"google.golang.org/grpc/serviceconfig"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
|
||||
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
|
||||
"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
|
||||
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
|
||||
|
|
@ -71,26 +66,6 @@ const (
|
|||
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
|
||||
)
|
||||
|
||||
var (
|
||||
defaultTestAuthorityServerConfig = &bootstrap.ServerConfig{
|
||||
ServerURI: "self_server",
|
||||
Creds: bootstrap.ChannelCreds{
|
||||
Type: "insecure",
|
||||
},
|
||||
}
|
||||
noopODLBCfg = outlierdetection.LBConfig{}
|
||||
noopODLBCfgJSON, _ = json.Marshal(noopODLBCfg)
|
||||
wrrLocalityLBConfig = &iserviceconfig.BalancerConfig{
|
||||
Name: wrrlocality.Name,
|
||||
Config: &wrrlocality.LBConfig{
|
||||
ChildPolicy: &iserviceconfig.BalancerConfig{
|
||||
Name: "round_robin",
|
||||
},
|
||||
},
|
||||
}
|
||||
wrrLocalityLBConfigJSON, _ = json.Marshal(wrrLocalityLBConfig)
|
||||
)
|
||||
|
||||
type s struct {
|
||||
grpctest.Tester
|
||||
}
|
||||
|
|
@ -99,229 +74,6 @@ func Test(t *testing.T) {
|
|||
grpctest.RunSubTests(t, s{})
|
||||
}
|
||||
|
||||
// cdsWatchInfo wraps the update and the error sent in a CDS watch callback.
|
||||
type cdsWatchInfo struct {
|
||||
update xdsresource.ClusterUpdate
|
||||
err error
|
||||
}
|
||||
|
||||
// invokeWatchCb invokes the CDS watch callback registered by the cdsBalancer
|
||||
// and waits for appropriate state to be pushed to the provided edsBalancer.
|
||||
func invokeWatchCbAndWait(ctx context.Context, xdsC *fakeclient.Client, cdsW cdsWatchInfo, wantCCS balancer.ClientConnState, edsB *testEDSBalancer) error {
|
||||
xdsC.InvokeWatchClusterCallback(cdsW.update, cdsW.err)
|
||||
if cdsW.err != nil {
|
||||
return edsB.waitForResolverError(ctx, cdsW.err)
|
||||
}
|
||||
return edsB.waitForClientConnUpdate(ctx, wantCCS)
|
||||
}
|
||||
|
||||
// testEDSBalancer is a fake edsBalancer used to verify different actions from
|
||||
// the cdsBalancer. It contains a bunch of channels to signal different events
|
||||
// to the test.
|
||||
type testEDSBalancer struct {
|
||||
// ccsCh is a channel used to signal the receipt of a ClientConn update.
|
||||
ccsCh *testutils.Channel
|
||||
// scStateCh is a channel used to signal the receipt of a SubConn update.
|
||||
scStateCh *testutils.Channel
|
||||
// resolverErrCh is a channel used to signal a resolver error.
|
||||
resolverErrCh *testutils.Channel
|
||||
// closeCh is a channel used to signal the closing of this balancer.
|
||||
closeCh *testutils.Channel
|
||||
exitIdleCh *testutils.Channel
|
||||
// parentCC is the balancer.ClientConn passed to this test balancer as part
|
||||
// of the Build() call.
|
||||
parentCC balancer.ClientConn
|
||||
}
|
||||
|
||||
type subConnWithState struct {
|
||||
sc balancer.SubConn
|
||||
state balancer.SubConnState
|
||||
}
|
||||
|
||||
func newTestEDSBalancer() *testEDSBalancer {
|
||||
return &testEDSBalancer{
|
||||
ccsCh: testutils.NewChannel(),
|
||||
scStateCh: testutils.NewChannel(),
|
||||
resolverErrCh: testutils.NewChannel(),
|
||||
closeCh: testutils.NewChannel(),
|
||||
exitIdleCh: testutils.NewChannel(),
|
||||
}
|
||||
}
|
||||
|
||||
func (tb *testEDSBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
|
||||
tb.ccsCh.Send(ccs)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tb *testEDSBalancer) ResolverError(err error) {
|
||||
tb.resolverErrCh.Send(err)
|
||||
}
|
||||
|
||||
func (tb *testEDSBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
||||
tb.scStateCh.Send(subConnWithState{sc: sc, state: state})
|
||||
}
|
||||
|
||||
func (tb *testEDSBalancer) Close() {
|
||||
tb.closeCh.Send(struct{}{})
|
||||
}
|
||||
|
||||
func (tb *testEDSBalancer) ExitIdle() {
|
||||
tb.exitIdleCh.Send(struct{}{})
|
||||
}
|
||||
|
||||
// waitForClientConnUpdate verifies if the testEDSBalancer receives the
|
||||
// provided ClientConnState within a reasonable amount of time.
|
||||
func (tb *testEDSBalancer) waitForClientConnUpdate(ctx context.Context, wantCCS balancer.ClientConnState) error {
|
||||
ccs, err := tb.ccsCh.Receive(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
gotCCS := ccs.(balancer.ClientConnState)
|
||||
if xdsclient.FromResolverState(gotCCS.ResolverState) == nil {
|
||||
return fmt.Errorf("want resolver state with XDSClient attached, got one without")
|
||||
}
|
||||
|
||||
// Calls into Cluster Resolver LB Config Equal(), which ignores JSON
|
||||
// configuration but compares the Parsed Configuration of the JSON fields
|
||||
// emitted from ParseConfig() on the cluster resolver.
|
||||
if diff := cmp.Diff(gotCCS, wantCCS, cmpopts.IgnoreFields(resolver.State{}, "Attributes"), cmp.AllowUnexported(clusterresolver.LBConfig{})); diff != "" {
|
||||
return fmt.Errorf("received unexpected ClientConnState, diff (-got +want): %v", diff)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForResolverError verifies if the testEDSBalancer receives the provided
|
||||
// resolver error before the context expires.
|
||||
func (tb *testEDSBalancer) waitForResolverError(ctx context.Context, wantErr error) error {
|
||||
gotErr, err := tb.resolverErrCh.Receive(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if gotErr != wantErr {
|
||||
return fmt.Errorf("received resolver error: %v, want %v", gotErr, wantErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// cdsCCS is a helper function to construct a good update passed from the
|
||||
// xdsResolver to the cdsBalancer.
|
||||
func cdsCCS(cluster string, xdsC xdsclient.XDSClient) balancer.ClientConnState {
|
||||
const cdsLBConfig = `{
|
||||
"loadBalancingConfig":[
|
||||
{
|
||||
"cds_experimental":{
|
||||
"Cluster": "%s"
|
||||
}
|
||||
}
|
||||
]
|
||||
}`
|
||||
jsonSC := fmt.Sprintf(cdsLBConfig, cluster)
|
||||
return balancer.ClientConnState{
|
||||
ResolverState: xdsclient.SetClient(resolver.State{
|
||||
ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC),
|
||||
}, xdsC),
|
||||
BalancerConfig: &lbConfig{ClusterName: clusterName},
|
||||
}
|
||||
}
|
||||
|
||||
// edsCCS is a helper function to construct a Client Conn update which
|
||||
// represents what the CDS Balancer passes to the Cluster Resolver. It calls
|
||||
// into Cluster Resolver's ParseConfig to get the service config to fill out the
|
||||
// Client Conn State. This is to fill out unexported parts of the Cluster
|
||||
// Resolver config struct. Returns an empty Client Conn State if it encounters
|
||||
// an error building out the Client Conn State.
|
||||
func edsCCS(service string, countMax *uint32, enableLRS bool, xdslbpolicy json.RawMessage, odConfig json.RawMessage) balancer.ClientConnState {
|
||||
builder := balancer.Get(clusterresolver.Name)
|
||||
if builder == nil {
|
||||
// Shouldn't happen, registered through imported Cluster Resolver,
|
||||
// defensive programming.
|
||||
logger.Errorf("%q LB policy is needed but not registered", clusterresolver.Name)
|
||||
return balancer.ClientConnState{} // will fail the calling test eventually through error in diff.
|
||||
}
|
||||
crParser, ok := builder.(balancer.ConfigParser)
|
||||
if !ok {
|
||||
// Shouldn't happen, imported Cluster Resolver builder has this method.
|
||||
logger.Errorf("%q LB policy does not implement a config parser", clusterresolver.Name)
|
||||
return balancer.ClientConnState{}
|
||||
}
|
||||
discoveryMechanism := clusterresolver.DiscoveryMechanism{
|
||||
Type: clusterresolver.DiscoveryMechanismTypeEDS,
|
||||
Cluster: service,
|
||||
MaxConcurrentRequests: countMax,
|
||||
OutlierDetection: odConfig,
|
||||
}
|
||||
if enableLRS {
|
||||
discoveryMechanism.LoadReportingServer = defaultTestAuthorityServerConfig
|
||||
}
|
||||
lbCfg := &clusterresolver.LBConfig{
|
||||
DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{discoveryMechanism},
|
||||
XDSLBPolicy: xdslbpolicy,
|
||||
}
|
||||
|
||||
crLBCfgJSON, err := json.Marshal(lbCfg)
|
||||
if err != nil {
|
||||
// Shouldn't happen, since we just prepared struct.
|
||||
logger.Errorf("cds_balancer: error marshalling prepared config: %v", lbCfg)
|
||||
return balancer.ClientConnState{}
|
||||
}
|
||||
|
||||
var sc serviceconfig.LoadBalancingConfig
|
||||
if sc, err = crParser.ParseConfig(crLBCfgJSON); err != nil {
|
||||
logger.Errorf("cds_balancer: cluster_resolver config generated %v is invalid: %v", crLBCfgJSON, err)
|
||||
return balancer.ClientConnState{}
|
||||
}
|
||||
|
||||
return balancer.ClientConnState{
|
||||
BalancerConfig: sc,
|
||||
}
|
||||
}
|
||||
|
||||
// setup creates a cdsBalancer and an edsBalancer (and overrides the
|
||||
// newChildBalancer function to return it), and also returns a cleanup function.
|
||||
func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *testutils.TestClientConn, func()) {
|
||||
t.Helper()
|
||||
xdsC := fakeclient.NewClient()
|
||||
builder := balancer.Get(cdsName)
|
||||
if builder == nil {
|
||||
t.Fatalf("balancer.Get(%q) returned nil", cdsName)
|
||||
}
|
||||
tcc := testutils.NewTestClientConn(t)
|
||||
cdsB := builder.Build(tcc, balancer.BuildOptions{})
|
||||
|
||||
edsB := newTestEDSBalancer()
|
||||
oldEDSBalancerBuilder := newChildBalancer
|
||||
newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
|
||||
edsB.parentCC = cc
|
||||
return edsB, nil
|
||||
}
|
||||
|
||||
return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
|
||||
newChildBalancer = oldEDSBalancerBuilder
|
||||
}
|
||||
}
|
||||
|
||||
// setupWithWatch does everything that setup does, and also pushes a ClientConn
|
||||
// update to the cdsBalancer and waits for a CDS watch call to be registered.
|
||||
func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *testutils.TestClientConn, func()) {
|
||||
t.Helper()
|
||||
|
||||
xdsC, cdsB, edsB, tcc, cancel := setup(t)
|
||||
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
|
||||
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
|
||||
}
|
||||
|
||||
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer ctxCancel()
|
||||
gotCluster, err := xdsC.WaitForWatchCluster(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
|
||||
}
|
||||
if gotCluster != clusterName {
|
||||
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, clusterName)
|
||||
}
|
||||
return xdsC, cdsB, edsB, tcc, cancel
|
||||
}
|
||||
|
||||
func waitForResourceNames(ctx context.Context, resourceNamesCh chan []string, wantNames []string) error {
|
||||
for ctx.Err() == nil {
|
||||
select {
|
||||
|
|
|
|||
Loading…
Reference in New Issue