internal/testutils: add a new test type that implements resolver.ClientConn (#6668)

This commit is contained in:
Easwar Swaminathan 2023-10-12 10:09:38 -07:00 committed by GitHub
parent 32e3ef1ed1
commit dd4c0adafb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1244 additions and 1456 deletions

View File

@ -167,7 +167,7 @@ func init() {
// glue code in weighted_target. It also tests an empty target config update,
// which should trigger a transient failure state update.
func (s) TestWeightedTarget(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
@ -329,7 +329,7 @@ func (s) TestWeightedTarget(t *testing.T) {
// have a weighted target balancer will one sub-balancer, and we add and remove
// backends from the subBalancer.
func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
@ -427,7 +427,7 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
// TestWeightedTarget_TwoSubBalancers_OneBackend tests the case where we have a
// weighted target balancer with two sub-balancers, each with one backend.
func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
@ -493,7 +493,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) {
// a weighted target balancer with two sub-balancers, each with more than one
// backend.
func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
@ -637,7 +637,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
// case where we have a weighted target balancer with two sub-balancers of
// differing weights.
func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
@ -718,7 +718,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *test
// have a weighted target balancer with three sub-balancers and we remove one of
// the subBalancers.
func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
@ -879,7 +879,7 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
// where we have a weighted target balancer with two sub-balancers, and we
// change the weight of these subBalancers.
func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
@ -997,7 +997,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing
// the picks won't fail with transient_failure, and should instead wait for the
// other sub-balancer.
func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
@ -1059,7 +1059,7 @@ func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) {
// connecting, the overall state stays in transient_failure, and all picks
// return transient failure error.
func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
@ -1141,7 +1141,7 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes
// Verify that a SubConn is created with the expected address and hierarchy
// path cleared.
func verifyAddressInNewSubConn(t *testing.T, cc *testutils.TestClientConn, addr resolver.Address) {
func verifyAddressInNewSubConn(t *testing.T, cc *testutils.BalancerClientConn, addr resolver.Address) {
t.Helper()
gotAddr := <-cc.NewSubConnAddrsCh
@ -1163,7 +1163,7 @@ type subConnWithAddr struct {
//
// Returned value is a map from subBalancer (identified by its config) to
// subConns created by it.
func waitForNewSubConns(t *testing.T, cc *testutils.TestClientConn, num int) map[string][]subConnWithAddr {
func waitForNewSubConns(t *testing.T, cc *testutils.BalancerClientConn, num int) map[string][]subConnWithAddr {
t.Helper()
scs := make(map[string][]subConnWithAddr)
@ -1233,7 +1233,7 @@ func init() {
// TestInitialIdle covers the case that if the child reports Idle, the overall
// state will be Idle.
func (s) TestInitialIdle(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
@ -1274,7 +1274,7 @@ func (s) TestInitialIdle(t *testing.T) {
// TestIgnoreSubBalancerStateTransitions covers the case that if the child reports a
// transition from TF to Connecting, the overall state will still be TF.
func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}
cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
@ -1314,17 +1314,17 @@ func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) {
// tcc wraps a testutils.TestClientConn but stores all state transitions in a
// slice.
type tcc struct {
*testutils.TestClientConn
*testutils.BalancerClientConn
states []balancer.State
}
func (t *tcc) UpdateState(bs balancer.State) {
t.states = append(t.states, bs)
t.TestClientConn.UpdateState(bs)
t.BalancerClientConn.UpdateState(bs)
}
func (s) TestUpdateStatePauses(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}
cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}
balFuncs := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {

View File

@ -48,9 +48,9 @@ import (
"google.golang.org/grpc/status"
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
_ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
)
const (

View File

@ -49,8 +49,8 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func setup(t *testing.T) (*testutils.TestClientConn, *Balancer) {
tcc := testutils.NewTestClientConn(t)
func setup(t *testing.T) (*testutils.BalancerClientConn, *Balancer) {
tcc := testutils.NewBalancerClientConn(t)
return tcc, NewBalancer(tcc, balancer.BuildOptions{})
}

View File

@ -73,7 +73,7 @@ func Test(t *testing.T) {
// - b3, weight 1, backends [1,2]
// Start the balancer group again and check for behavior.
func (s) TestBalancerGroup_start_close(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{
@ -176,7 +176,7 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
stub.Register(balancerName, stub.BalancerFuncs{})
builder := balancer.Get(balancerName)
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{
@ -203,8 +203,8 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
// Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer
// is removed later, so the balancer group returned has one sub-balancer in its
// own map, and one sub-balancer in cache.
func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duration) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.TestClientConn, map[resolver.Address]*testutils.TestSubConn) {
cc := testutils.NewTestClientConn(t)
func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duration) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.BalancerClientConn, map[resolver.Address]*testutils.TestSubConn) {
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{
@ -503,7 +503,7 @@ func (s) TestBalancerGroupBuildOptions(t *testing.T) {
return nil
},
})
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bg := New(Options{
CC: cc,
BuildOpts: bOpts,
@ -531,7 +531,7 @@ func (s) TestBalancerExitIdleOne(t *testing.T) {
exitIdleCh <- struct{}{}
},
})
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bg := New(Options{
CC: cc,
BuildOpts: balancer.BuildOptions{},
@ -561,7 +561,7 @@ func (s) TestBalancerExitIdleOne(t *testing.T) {
// for the second passed in address and also only picks that created SubConn.
// The new aggregated picker should reflect this change for the child.
func (s) TestBalancerGracefulSwitch(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{

View File

@ -23,7 +23,6 @@ package dns
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"os"
@ -37,6 +36,7 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/resolver/dns/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
@ -47,15 +47,11 @@ var EnableSRVLookups = false
var logger = grpclog.Component("dns")
// Globals to stub out in tests. TODO: Perhaps these two can be combined into a
// single variable for testing the resolver?
var (
newTimer = time.NewTimer
newTimerDNSResRate = time.NewTimer
)
func init() {
resolver.Register(NewBuilder())
internal.TimeAfterFunc = time.After
internal.NewNetResolver = newNetResolver
internal.AddressDialer = addressDialer
}
const (
@ -70,23 +66,6 @@ const (
txtAttribute = "grpc_config="
)
var (
errMissingAddr = errors.New("dns resolver: missing address")
// Addresses ending with a colon that is supposed to be the separator
// between host and port is not allowed. E.g. "::" is a valid address as
// it is an IPv6 address (host only) and "[::]:" is invalid as it ends with
// a colon as the host and port separator
errEndsWithColon = errors.New("dns resolver: missing port after port-separator colon")
)
var (
defaultResolver netResolver = net.DefaultResolver
// To prevent excessive re-resolution, we enforce a rate limit on DNS
// resolution requests.
minDNSResRate = 30 * time.Second
)
var addressDialer = func(address string) func(context.Context, string, string) (net.Conn, error) {
return func(ctx context.Context, network, _ string) (net.Conn, error) {
var dialer net.Dialer
@ -94,7 +73,11 @@ var addressDialer = func(address string) func(context.Context, string, string) (
}
}
var newNetResolver = func(authority string) (netResolver, error) {
var newNetResolver = func(authority string) (internal.NetResolver, error) {
if authority == "" {
return net.DefaultResolver, nil
}
host, port, err := parseTarget(authority, defaultDNSSvrPort)
if err != nil {
return nil, err
@ -104,7 +87,7 @@ var newNetResolver = func(authority string) (netResolver, error) {
return &net.Resolver{
PreferGo: true,
Dial: addressDialer(authorityWithPort),
Dial: internal.AddressDialer(authorityWithPort),
}, nil
}
@ -142,13 +125,9 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
disableServiceConfig: opts.DisableServiceConfig,
}
if target.URL.Host == "" {
d.resolver = defaultResolver
} else {
d.resolver, err = newNetResolver(target.URL.Host)
if err != nil {
return nil, err
}
d.resolver, err = internal.NewNetResolver(target.URL.Host)
if err != nil {
return nil, err
}
d.wg.Add(1)
@ -161,12 +140,6 @@ func (b *dnsBuilder) Scheme() string {
return "dns"
}
type netResolver interface {
LookupHost(ctx context.Context, host string) (addrs []string, err error)
LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
LookupTXT(ctx context.Context, name string) (txts []string, err error)
}
// deadResolver is a resolver that does nothing.
type deadResolver struct{}
@ -178,7 +151,7 @@ func (deadResolver) Close() {}
type dnsResolver struct {
host string
port string
resolver netResolver
resolver internal.NetResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
@ -223,29 +196,27 @@ func (d *dnsResolver) watcher() {
err = d.cc.UpdateState(*state)
}
var timer *time.Timer
var waitTime time.Duration
if err == nil {
// Success resolving, wait for the next ResolveNow. However, also wait 30
// seconds at the very least to prevent constantly re-resolving.
backoffIndex = 1
timer = newTimerDNSResRate(minDNSResRate)
waitTime = internal.MinResolutionRate
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-d.rn:
}
} else {
// Poll on an error found in DNS Resolver or an error received from
// ClientConn.
timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))
waitTime = backoff.DefaultExponential.Backoff(backoffIndex)
backoffIndex++
}
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-timer.C:
case <-internal.TimeAfterFunc(waitTime):
}
}
}
@ -387,7 +358,7 @@ func formatIP(addr string) (addrIP string, ok bool) {
// target: ":80" defaultPort: "443" returns host: "localhost", port: "80"
func parseTarget(target, defaultPort string) (host, port string, err error) {
if target == "" {
return "", "", errMissingAddr
return "", "", internal.ErrMissingAddr
}
if ip := net.ParseIP(target); ip != nil {
// target is an IPv4 or IPv6(without brackets) address
@ -397,7 +368,7 @@ func parseTarget(target, defaultPort string) (host, port string, err error) {
if port == "" {
// If the port field is empty (target ends with colon), e.g. "[::1]:",
// this is an error.
return "", "", errEndsWithColon
return "", "", internal.ErrEndsWithColon
}
// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port
if host == "" {

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,123 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package dns_test
import (
"context"
"net"
"sync"
"google.golang.org/grpc/internal/testutils"
)
// A fake implementation of the internal.NetResolver interface for use in tests.
type testNetResolver struct {
// A write to this channel is made when this resolver receives a resolution
// request. Tests can rely on reading from this channel to be notified about
// resolution requests instead of sleeping for a predefined period of time.
lookupHostCh *testutils.Channel
mu sync.Mutex
hostLookupTable map[string][]string // Name --> list of addresses
srvLookupTable map[string][]*net.SRV // Name --> list of SRV records
txtLookupTable map[string][]string // Name --> service config for TXT record
}
func (tr *testNetResolver) LookupHost(ctx context.Context, host string) ([]string, error) {
if tr.lookupHostCh != nil {
tr.lookupHostCh.Send(nil)
}
tr.mu.Lock()
defer tr.mu.Unlock()
if addrs, ok := tr.hostLookupTable[host]; ok {
return addrs, nil
}
return nil, &net.DNSError{
Err: "hostLookup error",
Name: host,
Server: "fake",
IsTemporary: true,
}
}
func (tr *testNetResolver) UpdateHostLookupTable(table map[string][]string) {
tr.mu.Lock()
tr.hostLookupTable = table
tr.mu.Unlock()
}
func (tr *testNetResolver) LookupSRV(ctx context.Context, service, proto, name string) (string, []*net.SRV, error) {
tr.mu.Lock()
defer tr.mu.Unlock()
cname := "_" + service + "._" + proto + "." + name
if srvs, ok := tr.srvLookupTable[cname]; ok {
return cname, srvs, nil
}
return "", nil, &net.DNSError{
Err: "srvLookup error",
Name: cname,
Server: "fake",
IsTemporary: true,
}
}
func (tr *testNetResolver) LookupTXT(ctx context.Context, host string) ([]string, error) {
tr.mu.Lock()
defer tr.mu.Unlock()
if sc, ok := tr.txtLookupTable[host]; ok {
return sc, nil
}
return nil, &net.DNSError{
Err: "txtLookup error",
Name: host,
Server: "fake",
IsTemporary: true,
}
}
func (tr *testNetResolver) UpdateTXTLookupTable(table map[string][]string) {
tr.mu.Lock()
tr.txtLookupTable = table
tr.mu.Unlock()
}
// txtRecordServiceConfig generates a slice of strings (aggregately representing
// a single service config file) for the input config string, that represents
// the result from a real DNS TXT record lookup.
func txtRecordServiceConfig(cfg string) []string {
// In DNS, service config is encoded in a TXT record via the mechanism
// described in RFC-1464 using the attribute name grpc_config.
b := append([]byte("grpc_config="), []byte(cfg)...)
// Split b into multiple strings, each with a max of 255 bytes, which is
// the DNS TXT record limit.
var r []string
for i := 0; i < len(b); i += txtBytesLimit {
if i+txtBytesLimit > len(b) {
r = append(r, string(b[i:]))
} else {
r = append(r, string(b[i:i+txtBytesLimit]))
}
}
return r
}

View File

@ -0,0 +1,70 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package internal contains functionality internal to the dns resolver package.
package internal
import (
"context"
"errors"
"net"
"time"
)
// NetResolver groups the methods on net.Resolver that are used by the DNS
// resolver implementation. This allows the default net.Resolver instance to be
// overidden from tests.
type NetResolver interface {
LookupHost(ctx context.Context, host string) (addrs []string, err error)
LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
LookupTXT(ctx context.Context, name string) (txts []string, err error)
}
var (
// ErrMissingAddr is the error returned when building a DNS resolver when
// the provided target name is empty.
ErrMissingAddr = errors.New("dns resolver: missing address")
// ErrEndsWithColon is the error returned when building a DNS resolver when
// the provided target name ends with a colon that is supposed to be the
// separator between host and port. E.g. "::" is a valid address as it is
// an IPv6 address (host only) and "[::]:" is invalid as it ends with a
// colon as the host and port separator
ErrEndsWithColon = errors.New("dns resolver: missing port after port-separator colon")
)
// The following vars are overridden from tests.
var (
// MinResolutionRate is the minimum rate at which re-resolutions are
// allowed. This helps to prevent excessive re-resolution.
MinResolutionRate = 30 * time.Second
// TimeAfterFunc is used by the DNS resolver to wait for the given duration
// to elapse. In non-test code, this is implemented by time.After. In test
// code, this can be used to control the amount of time the resolver is
// blocked waiting for the duration to elapse.
TimeAfterFunc func(time.Duration) <-chan time.Time
// NewNetResolver returns the net.Resolver instance for the given target.
NewNetResolver func(string) (NetResolver, error)
// AddressDialer is the dialer used to dial the DNS server. It accepts the
// Host portion of the URL corresponding to the user's dial target and
// returns a dial function.
AddressDialer func(address string) func(context.Context, string, string) (net.Conn, error)
)

View File

@ -30,16 +30,9 @@ import (
"google.golang.org/grpc/resolver"
)
// testingLogger wraps the logging methods from testing.T.
type testingLogger interface {
Log(args ...any)
Logf(format string, args ...any)
Errorf(format string, args ...any)
}
// TestSubConn implements the SubConn interface, to be used in tests.
type TestSubConn struct {
tcc *TestClientConn // the CC that owns this SubConn
tcc *BalancerClientConn // the CC that owns this SubConn
id string
ConnectCh chan struct{}
stateListener func(balancer.SubConnState)
@ -98,9 +91,9 @@ func (tsc *TestSubConn) String() string {
return tsc.id
}
// TestClientConn is a mock balancer.ClientConn used in tests.
type TestClientConn struct {
logger testingLogger
// BalancerClientConn is a mock balancer.ClientConn used in tests.
type BalancerClientConn struct {
logger Logger
NewSubConnAddrsCh chan []resolver.Address // the last 10 []Address to create subconn.
NewSubConnCh chan *TestSubConn // the last 10 subconn created.
@ -114,9 +107,9 @@ type TestClientConn struct {
subConnIdx int
}
// NewTestClientConn creates a TestClientConn.
func NewTestClientConn(t *testing.T) *TestClientConn {
return &TestClientConn{
// NewBalancerClientConn creates a BalancerClientConn.
func NewBalancerClientConn(t *testing.T) *BalancerClientConn {
return &BalancerClientConn{
logger: t,
NewSubConnAddrsCh: make(chan []resolver.Address, 10),
@ -131,7 +124,7 @@ func NewTestClientConn(t *testing.T) *TestClientConn {
}
// NewSubConn creates a new SubConn.
func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
func (tcc *BalancerClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := &TestSubConn{
tcc: tcc,
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
@ -156,13 +149,13 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon
// RemoveSubConn is a nop; tests should all be updated to use sc.Shutdown()
// instead.
func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
func (tcc *BalancerClientConn) RemoveSubConn(sc balancer.SubConn) {
tcc.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
}
// UpdateAddresses updates the addresses on the SubConn.
func (tcc *TestClientConn) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
tcc.logger.Logf("testClientConn: UpdateAddresses(%v, %+v)", sc, addrs)
func (tcc *BalancerClientConn) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
tcc.logger.Logf("testutils.BalancerClientConn: UpdateAddresses(%v, %+v)", sc, addrs)
select {
case tcc.UpdateAddressesAddrsCh <- addrs:
default:
@ -170,8 +163,8 @@ func (tcc *TestClientConn) UpdateAddresses(sc balancer.SubConn, addrs []resolver
}
// UpdateState updates connectivity state and picker.
func (tcc *TestClientConn) UpdateState(bs balancer.State) {
tcc.logger.Logf("testClientConn: UpdateState(%v)", bs)
func (tcc *BalancerClientConn) UpdateState(bs balancer.State) {
tcc.logger.Logf("testutils.BalancerClientConn: UpdateState(%v)", bs)
select {
case <-tcc.NewStateCh:
default:
@ -186,7 +179,7 @@ func (tcc *TestClientConn) UpdateState(bs balancer.State) {
}
// ResolveNow panics.
func (tcc *TestClientConn) ResolveNow(o resolver.ResolveNowOptions) {
func (tcc *BalancerClientConn) ResolveNow(o resolver.ResolveNowOptions) {
select {
case <-tcc.ResolveNowCh:
default:
@ -195,14 +188,14 @@ func (tcc *TestClientConn) ResolveNow(o resolver.ResolveNowOptions) {
}
// Target panics.
func (tcc *TestClientConn) Target() string {
func (tcc *BalancerClientConn) Target() string {
panic("not implemented")
}
// WaitForErrPicker waits until an error picker is pushed to this ClientConn.
// Returns error if the provided context expires or a non-error picker is pushed
// to the ClientConn.
func (tcc *TestClientConn) WaitForErrPicker(ctx context.Context) error {
func (tcc *BalancerClientConn) WaitForErrPicker(ctx context.Context) error {
select {
case <-ctx.Done():
return errors.New("timeout when waiting for an error picker")
@ -218,7 +211,7 @@ func (tcc *TestClientConn) WaitForErrPicker(ctx context.Context) error {
// ClientConn with the error matching the wanted error. Returns an error if
// the provided context expires, including the last received picker error (if
// any).
func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error) error {
func (tcc *BalancerClientConn) WaitForPickerWithErr(ctx context.Context, want error) error {
lastErr := errors.New("received no picker")
for {
select {
@ -235,7 +228,7 @@ func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error)
// WaitForConnectivityState waits until the state pushed to this ClientConn
// matches the wanted state. Returns an error if the provided context expires,
// including the last received state (if any).
func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want connectivity.State) error {
func (tcc *BalancerClientConn) WaitForConnectivityState(ctx context.Context, want connectivity.State) error {
var lastState connectivity.State = -1
for {
select {
@ -255,7 +248,7 @@ func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want co
// is pending) to be considered. Returns an error if the provided context
// expires, including the last received error from IsRoundRobin or the picker
// (if any).
func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...balancer.SubConn) error {
func (tcc *BalancerClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...balancer.SubConn) error {
lastErr := errors.New("received no picker")
for {
select {
@ -294,7 +287,7 @@ func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...
// WaitForPicker waits for a picker that results in f returning nil. If the
// context expires, returns the last error returned by f (if any).
func (tcc *TestClientConn) WaitForPicker(ctx context.Context, f func(balancer.Picker) error) error {
func (tcc *BalancerClientConn) WaitForPicker(ctx context.Context, f func(balancer.Picker) error) error {
lastErr := errors.New("received no picker")
for {
select {

View File

@ -0,0 +1,70 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package testutils
import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
// Logger wraps the logging methods from testing.T.
type Logger interface {
Log(args ...any)
Logf(format string, args ...any)
Errorf(format string, args ...any)
}
// ResolverClientConn is a fake implemetation of the resolver.ClientConn
// interface to be used in tests.
type ResolverClientConn struct {
resolver.ClientConn // Embedding the interface to avoid implementing deprecated methods.
Logger Logger // Tests should pass testing.T for this.
UpdateStateF func(resolver.State) error // Invoked when resolver pushes a state update.
ReportErrorF func(err error) // Invoked when resolver pushes an error.
}
// UpdateState invokes the test specified callback with the update received from
// the resolver. If the callback returns a non-nil error, the same will be
// propagated to the resolver.
func (t *ResolverClientConn) UpdateState(s resolver.State) error {
t.Logger.Logf("testutils.ResolverClientConn: UpdateState(%s)", pretty.ToJSON(s))
if t.UpdateStateF != nil {
return t.UpdateStateF(s)
}
return nil
}
// ReportError pushes the error received from the resolver on to ErrorCh.
func (t *ResolverClientConn) ReportError(err error) {
t.Logger.Logf("testutils.ResolverClientConn: ReportError(%v)", err)
if t.ReportErrorF != nil {
t.ReportErrorF(err)
}
}
// ParseServiceConfig parses the provided service by delegating the work to the
// implementation in the grpc package.
func (t *ResolverClientConn) ParseServiceConfig(jsonSC string) *serviceconfig.ParseResult {
return internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
}

View File

@ -92,7 +92,7 @@ func (s) TestDropByCategory(t *testing.T) {
xdsC := fakeclient.NewClient()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
@ -251,7 +251,7 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
xdsC := fakeclient.NewClient()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
@ -363,7 +363,7 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) {
xdsC := fakeclient.NewClient()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
// Create a stub balancer which waits for the cluster_impl policy to be
@ -436,7 +436,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
xdsC := fakeclient.NewClient()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
@ -511,7 +511,7 @@ func (s) TestReResolution(t *testing.T) {
xdsC := fakeclient.NewClient()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
@ -578,7 +578,7 @@ func (s) TestLoadReporting(t *testing.T) {
xdsC := fakeclient.NewClient()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
@ -692,7 +692,7 @@ func (s) TestUpdateLRSServer(t *testing.T) {
xdsC := fakeclient.NewClient()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()

View File

@ -75,7 +75,7 @@ func testPick(t *testing.T, p balancer.Picker, info balancer.PickInfo, wantSC ba
}
func TestClusterPicks(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
builder := balancer.Get(balancerName)
parser := builder.(balancer.ConfigParser)
bal := builder.Build(cc, balancer.BuildOptions{})
@ -154,7 +154,7 @@ func TestClusterPicks(t *testing.T) {
// TestConfigUpdateAddCluster covers the cases the balancer receives config
// update with extra clusters.
func TestConfigUpdateAddCluster(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
builder := balancer.Get(balancerName)
parser := builder.(balancer.ConfigParser)
bal := builder.Build(cc, balancer.BuildOptions{})
@ -312,7 +312,7 @@ func TestConfigUpdateAddCluster(t *testing.T) {
// TestRoutingConfigUpdateDeleteAll covers the cases the balancer receives
// config update with no clusters. Pick should fail with details in error.
func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
builder := balancer.Get(balancerName)
parser := builder.(balancer.ConfigParser)
bal := builder.Build(cc, balancer.BuildOptions{})
@ -498,7 +498,7 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
},
})
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
builder := balancer.Get(balancerName)
parser := builder.(balancer.ConfigParser)
bal := builder.Build(cc, bOpts)
@ -558,7 +558,7 @@ func init() {
// TestInitialIdle covers the case that if the child reports Idle, the overall
// state will be Idle.
func TestInitialIdle(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
builder := balancer.Get(balancerName)
parser := builder.(balancer.ConfigParser)
bal := builder.Build(cc, balancer.BuildOptions{})
@ -605,7 +605,7 @@ func TestInitialIdle(t *testing.T) {
// it's state and completes the graceful switch process the new picker should
// reflect this change.
func TestClusterGracefulSwitch(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
builder := balancer.Get(balancerName)
parser := builder.(balancer.ConfigParser)
bal := builder.Build(cc, balancer.BuildOptions{})
@ -708,17 +708,17 @@ func TestClusterGracefulSwitch(t *testing.T) {
// tcc wraps a testutils.TestClientConn but stores all state transitions in a
// slice.
type tcc struct {
*testutils.TestClientConn
*testutils.BalancerClientConn
states []balancer.State
}
func (t *tcc) UpdateState(bs balancer.State) {
t.states = append(t.states, bs)
t.TestClientConn.UpdateState(bs)
t.BalancerClientConn.UpdateState(bs)
}
func (s) TestUpdateStatePauses(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}
cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}
balFuncs := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {

View File

@ -543,13 +543,13 @@ type subConnWithState struct {
state balancer.SubConnState
}
func setup(t *testing.T) (*outlierDetectionBalancer, *testutils.TestClientConn, func()) {
func setup(t *testing.T) (*outlierDetectionBalancer, *testutils.BalancerClientConn, func()) {
t.Helper()
builder := balancer.Get(Name)
if builder == nil {
t.Fatalf("balancer.Get(%q) returned nil", Name)
}
tcc := testutils.NewTestClientConn(t)
tcc := testutils.NewBalancerClientConn(t)
odB := builder.Build(tcc, balancer.BuildOptions{ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefChannel, time.Now().Unix(), nil)})
return odB.(*outlierDetectionBalancer), tcc, odB.Close
}

View File

@ -81,7 +81,7 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -195,7 +195,7 @@ func (s) TestPriority_SwitchPriority(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -358,7 +358,7 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -442,7 +442,7 @@ func (s) TestPriority_HigherDownWhileAddingLower(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -543,7 +543,7 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -645,7 +645,7 @@ func (s) TestPriority_InitTimeout(t *testing.T) {
}
}()()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -716,7 +716,7 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) {
}
}()()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -875,7 +875,7 @@ func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -967,7 +967,7 @@ func (s) TestPriority_FirstPriorityUnavailable(t *testing.T) {
}(DefaultPriorityInitTimeout)
DefaultPriorityInitTimeout = testPriorityInitTimeout
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -1013,7 +1013,7 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -1110,7 +1110,7 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -1206,7 +1206,7 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -1310,7 +1310,7 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) {
}
}()()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -1409,7 +1409,7 @@ func (s) TestPriority_ChildPolicyChange(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -1508,7 +1508,7 @@ func (s) TestPriority_ChildPolicyUpdatePickerInline(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -1551,7 +1551,7 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) {
},
})
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -1650,7 +1650,7 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) {
},
})
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -1772,7 +1772,7 @@ func (s) TestPriority_HighPriorityInitIdle(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -1838,7 +1838,7 @@ func (s) TestPriority_AddLowPriorityWhenHighIsInIdle(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
@ -1919,7 +1919,7 @@ func (s) TestPriority_HighPriorityUpdatesWhenLowInUse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()

View File

@ -28,7 +28,7 @@ import (
)
func (s) TestIgnoreResolveNowClientConn(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
ignoreCC := newIgnoreResolveNowClientConn(cc, false)
// Call ResolveNow() on the CC, it should be forwarded.

View File

@ -67,9 +67,9 @@ func ctxWithHash(h uint64) context.Context {
}
// setupTest creates the balancer, and does an initial sanity check.
func setupTest(t *testing.T, addrs []resolver.Address) (*testutils.TestClientConn, balancer.Balancer, balancer.Picker) {
func setupTest(t *testing.T, addrs []resolver.Address) (*testutils.BalancerClientConn, balancer.Balancer, balancer.Picker) {
t.Helper()
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
builder := balancer.Get(Name)
b := builder.Build(cc, balancer.BuildOptions{})
if b == nil {

View File

@ -167,7 +167,7 @@ func (s) TestUpdateClientConnState(t *testing.T) {
if builder == nil {
t.Fatalf("balancer.Get(%q) returned nil", Name)
}
tcc := testutils.NewTestClientConn(t)
tcc := testutils.NewBalancerClientConn(t)
bal := builder.Build(tcc, balancer.BuildOptions{})
defer bal.Close()
wrrL := bal.(*wrrLocalityBalancer)