mirror of https://github.com/grpc/grpc-go.git
414 lines
13 KiB
Go
414 lines
13 KiB
Go
/*
|
|
*
|
|
* Copyright 2020 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package testutils
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"testing"
|
|
|
|
"google.golang.org/grpc/balancer"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/experimental/stats"
|
|
"google.golang.org/grpc/internal"
|
|
"google.golang.org/grpc/internal/grpcsync"
|
|
"google.golang.org/grpc/resolver"
|
|
|
|
istats "google.golang.org/grpc/internal/stats"
|
|
)
|
|
|
|
// TestSubConn implements the SubConn interface, to be used in tests.
|
|
type TestSubConn struct {
|
|
balancer.SubConn
|
|
tcc *BalancerClientConn // the CC that owns this SubConn
|
|
id string
|
|
ConnectCh chan struct{}
|
|
stateListener func(balancer.SubConnState)
|
|
healthListener func(balancer.SubConnState)
|
|
connectCalled *grpcsync.Event
|
|
Addresses []resolver.Address
|
|
}
|
|
|
|
// NewTestSubConn returns a newly initialized SubConn. Typically, subconns
|
|
// should be created via TestClientConn.NewSubConn instead, but can be useful
|
|
// for some tests.
|
|
func NewTestSubConn(id string) *TestSubConn {
|
|
return &TestSubConn{
|
|
ConnectCh: make(chan struct{}, 1),
|
|
connectCalled: grpcsync.NewEvent(),
|
|
id: id,
|
|
}
|
|
}
|
|
|
|
// UpdateAddresses is a no-op.
|
|
func (tsc *TestSubConn) UpdateAddresses([]resolver.Address) {}
|
|
|
|
// Connect is a no-op.
|
|
func (tsc *TestSubConn) Connect() {
|
|
tsc.connectCalled.Fire()
|
|
select {
|
|
case tsc.ConnectCh <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// GetOrBuildProducer is a no-op.
|
|
func (tsc *TestSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
|
|
return nil, nil
|
|
}
|
|
|
|
// UpdateState pushes the state to the listener, if one is registered.
|
|
func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) {
|
|
<-tsc.connectCalled.Done()
|
|
if tsc.stateListener != nil {
|
|
tsc.stateListener(state)
|
|
}
|
|
// pickfirst registers a health listener synchronously while handing updates
|
|
// to READY. It updates the balancing state only after receiving the health
|
|
// update. We update the health state here so callers of tsc.UpdateState
|
|
// can verify picker updates as soon as UpdateState returns.
|
|
if state.ConnectivityState == connectivity.Ready && tsc.healthListener != nil {
|
|
tsc.healthListener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
|
|
}
|
|
}
|
|
|
|
// Shutdown pushes the SubConn to the ShutdownSubConn channel in the parent
|
|
// TestClientConn.
|
|
func (tsc *TestSubConn) Shutdown() {
|
|
tsc.tcc.logger.Logf("SubConn %s: Shutdown", tsc)
|
|
select {
|
|
case tsc.tcc.ShutdownSubConnCh <- tsc:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// String implements stringer to print human friendly error message.
|
|
func (tsc *TestSubConn) String() string {
|
|
return tsc.id
|
|
}
|
|
|
|
// RegisterHealthListener sends a READY update to mock a situation when no
|
|
// health checking mechanisms are configured.
|
|
func (tsc *TestSubConn) RegisterHealthListener(lis func(balancer.SubConnState)) {
|
|
tsc.healthListener = lis
|
|
}
|
|
|
|
// BalancerClientConn is a mock balancer.ClientConn used in tests.
|
|
type BalancerClientConn struct {
|
|
internal.EnforceClientConnEmbedding
|
|
logger Logger
|
|
|
|
NewSubConnAddrsCh chan []resolver.Address // the last 10 []Address to create subconn.
|
|
NewSubConnCh chan *TestSubConn // the last 10 subconn created.
|
|
ShutdownSubConnCh chan *TestSubConn // the last 10 subconn removed.
|
|
UpdateAddressesAddrsCh chan []resolver.Address // last updated address via UpdateAddresses().
|
|
|
|
NewPickerCh chan balancer.Picker // the last picker updated.
|
|
NewStateCh chan connectivity.State // the last state.
|
|
ResolveNowCh chan resolver.ResolveNowOptions // the last ResolveNow().
|
|
|
|
subConnIdx int
|
|
}
|
|
|
|
// NewBalancerClientConn creates a BalancerClientConn.
|
|
func NewBalancerClientConn(t *testing.T) *BalancerClientConn {
|
|
return &BalancerClientConn{
|
|
logger: t,
|
|
|
|
NewSubConnAddrsCh: make(chan []resolver.Address, 10),
|
|
NewSubConnCh: make(chan *TestSubConn, 10),
|
|
ShutdownSubConnCh: make(chan *TestSubConn, 10),
|
|
UpdateAddressesAddrsCh: make(chan []resolver.Address, 1),
|
|
|
|
NewPickerCh: make(chan balancer.Picker, 1),
|
|
NewStateCh: make(chan connectivity.State, 1),
|
|
ResolveNowCh: make(chan resolver.ResolveNowOptions, 1),
|
|
}
|
|
}
|
|
|
|
// NewSubConn creates a new SubConn.
|
|
func (tcc *BalancerClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
|
sc := &TestSubConn{
|
|
tcc: tcc,
|
|
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
|
|
ConnectCh: make(chan struct{}, 1),
|
|
stateListener: o.StateListener,
|
|
connectCalled: grpcsync.NewEvent(),
|
|
Addresses: a,
|
|
}
|
|
tcc.subConnIdx++
|
|
tcc.logger.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc)
|
|
select {
|
|
case tcc.NewSubConnAddrsCh <- a:
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case tcc.NewSubConnCh <- sc:
|
|
default:
|
|
}
|
|
|
|
return sc, nil
|
|
}
|
|
|
|
// MetricsRecorder returns an empty MetricsRecorderList.
|
|
func (*BalancerClientConn) MetricsRecorder() stats.MetricsRecorder {
|
|
return istats.NewMetricsRecorderList(nil)
|
|
}
|
|
|
|
// RemoveSubConn is a nop; tests should all be updated to use sc.Shutdown()
|
|
// instead.
|
|
func (tcc *BalancerClientConn) RemoveSubConn(sc balancer.SubConn) {
|
|
tcc.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
|
|
}
|
|
|
|
// UpdateAddresses updates the addresses on the SubConn.
|
|
func (tcc *BalancerClientConn) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
|
|
tcc.logger.Logf("testutils.BalancerClientConn: UpdateAddresses(%v, %+v)", sc, addrs)
|
|
select {
|
|
case tcc.UpdateAddressesAddrsCh <- addrs:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// UpdateState updates connectivity state and picker.
|
|
func (tcc *BalancerClientConn) UpdateState(bs balancer.State) {
|
|
tcc.logger.Logf("testutils.BalancerClientConn: UpdateState(%v)", bs)
|
|
select {
|
|
case <-tcc.NewStateCh:
|
|
default:
|
|
}
|
|
tcc.NewStateCh <- bs.ConnectivityState
|
|
|
|
select {
|
|
case <-tcc.NewPickerCh:
|
|
default:
|
|
}
|
|
tcc.NewPickerCh <- bs.Picker
|
|
}
|
|
|
|
// ResolveNow panics.
|
|
func (tcc *BalancerClientConn) ResolveNow(o resolver.ResolveNowOptions) {
|
|
select {
|
|
case <-tcc.ResolveNowCh:
|
|
default:
|
|
}
|
|
tcc.ResolveNowCh <- o
|
|
}
|
|
|
|
// Target panics.
|
|
func (tcc *BalancerClientConn) Target() string {
|
|
panic("not implemented")
|
|
}
|
|
|
|
// WaitForErrPicker waits until an error picker is pushed to this ClientConn.
|
|
// Returns error if the provided context expires or a non-error picker is pushed
|
|
// to the ClientConn.
|
|
func (tcc *BalancerClientConn) WaitForErrPicker(ctx context.Context) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return errors.New("timeout when waiting for an error picker")
|
|
case picker := <-tcc.NewPickerCh:
|
|
if _, perr := picker.Pick(balancer.PickInfo{}); perr == nil {
|
|
return fmt.Errorf("balancer returned a picker which is not an error picker")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// WaitForPickerWithErr waits until an error picker is pushed to this
|
|
// ClientConn with the error matching the wanted error. Returns an error if
|
|
// the provided context expires, including the last received picker error (if
|
|
// any).
|
|
func (tcc *BalancerClientConn) WaitForPickerWithErr(ctx context.Context, want error) error {
|
|
lastErr := errors.New("received no picker")
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("timeout when waiting for an error picker with %v; last picker error: %v", want, lastErr)
|
|
case picker := <-tcc.NewPickerCh:
|
|
if _, lastErr = picker.Pick(balancer.PickInfo{}); lastErr != nil && lastErr.Error() == want.Error() {
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// WaitForConnectivityState waits until the state pushed to this ClientConn
|
|
// matches the wanted state. Returns an error if the provided context expires,
|
|
// including the last received state (if any).
|
|
func (tcc *BalancerClientConn) WaitForConnectivityState(ctx context.Context, want connectivity.State) error {
|
|
var lastState connectivity.State = -1
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("timeout when waiting for state to be %s; last state: %s", want, lastState)
|
|
case s := <-tcc.NewStateCh:
|
|
if s == want {
|
|
return nil
|
|
}
|
|
lastState = s
|
|
}
|
|
}
|
|
}
|
|
|
|
// WaitForRoundRobinPicker waits for a picker that passes IsRoundRobin. Also
|
|
// drains the matching state channel and requires it to be READY (if an entry
|
|
// is pending) to be considered. Returns an error if the provided context
|
|
// expires, including the last received error from IsRoundRobin or the picker
|
|
// (if any).
|
|
func (tcc *BalancerClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...balancer.SubConn) error {
|
|
lastErr := errors.New("received no picker")
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("timeout when waiting for round robin picker with %v; last error: %v", want, lastErr)
|
|
case p := <-tcc.NewPickerCh:
|
|
s := connectivity.Ready
|
|
select {
|
|
case s = <-tcc.NewStateCh:
|
|
default:
|
|
}
|
|
if s != connectivity.Ready {
|
|
lastErr = fmt.Errorf("received state %v instead of ready", s)
|
|
break
|
|
}
|
|
var pickerErr error
|
|
if err := IsRoundRobin(want, func() balancer.SubConn {
|
|
sc, err := p.Pick(balancer.PickInfo{})
|
|
if err != nil {
|
|
pickerErr = err
|
|
} else if sc.Done != nil {
|
|
sc.Done(balancer.DoneInfo{})
|
|
}
|
|
return sc.SubConn
|
|
}); pickerErr != nil {
|
|
lastErr = pickerErr
|
|
continue
|
|
} else if err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// WaitForPicker waits for a picker that results in f returning nil. If the
|
|
// context expires, returns the last error returned by f (if any).
|
|
func (tcc *BalancerClientConn) WaitForPicker(ctx context.Context, f func(balancer.Picker) error) error {
|
|
lastErr := errors.New("received no picker")
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("timeout when waiting for picker; last error: %v", lastErr)
|
|
case p := <-tcc.NewPickerCh:
|
|
if err := f(p); err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// IsRoundRobin checks whether f's return value is roundrobin of elements from
|
|
// want. But it doesn't check for the order. Note that want can contain
|
|
// duplicate items, which makes it weight-round-robin.
|
|
//
|
|
// Step 1. the return values of f should form a permutation of all elements in
|
|
// want, but not necessary in the same order. E.g. if want is {a,a,b}, the check
|
|
// fails if f returns:
|
|
// - {a,a,a}: third a is returned before b
|
|
// - {a,b,b}: second b is returned before the second a
|
|
//
|
|
// If error is found in this step, the returned error contains only the first
|
|
// iteration until where it goes wrong.
|
|
//
|
|
// Step 2. the return values of f should be repetitions of the same permutation.
|
|
// E.g. if want is {a,a,b}, the check fails if f returns:
|
|
// - {a,b,a,b,a,a}: though it satisfies step 1, the second iteration is not
|
|
// repeating the first iteration.
|
|
//
|
|
// If error is found in this step, the returned error contains the first
|
|
// iteration + the second iteration until where it goes wrong.
|
|
func IsRoundRobin(want []balancer.SubConn, f func() balancer.SubConn) error {
|
|
wantSet := make(map[balancer.SubConn]int) // SubConn -> count, for weighted RR.
|
|
for _, sc := range want {
|
|
wantSet[sc]++
|
|
}
|
|
|
|
// The first iteration: makes sure f's return values form a permutation of
|
|
// elements in want.
|
|
//
|
|
// Also keep the returns values in a slice, so we can compare the order in
|
|
// the second iteration.
|
|
gotSliceFirstIteration := make([]balancer.SubConn, 0, len(want))
|
|
for range want {
|
|
got := f()
|
|
gotSliceFirstIteration = append(gotSliceFirstIteration, got)
|
|
wantSet[got]--
|
|
if wantSet[got] < 0 {
|
|
return fmt.Errorf("non-roundrobin want: %v, result: %v", want, gotSliceFirstIteration)
|
|
}
|
|
}
|
|
|
|
// The second iteration should repeat the first iteration.
|
|
var gotSliceSecondIteration []balancer.SubConn
|
|
for i := 0; i < 2; i++ {
|
|
for _, w := range gotSliceFirstIteration {
|
|
g := f()
|
|
gotSliceSecondIteration = append(gotSliceSecondIteration, g)
|
|
if w != g {
|
|
return fmt.Errorf("non-roundrobin, first iter: %v, second iter: %v", gotSliceFirstIteration, gotSliceSecondIteration)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SubConnFromPicker returns a function which returns a SubConn by calling the
|
|
// Pick() method of the provided picker. There is no caching of SubConns here.
|
|
// Every invocation of the returned function results in a new pick.
|
|
func SubConnFromPicker(p balancer.Picker) func() balancer.SubConn {
|
|
return func() balancer.SubConn {
|
|
scst, _ := p.Pick(balancer.PickInfo{})
|
|
return scst.SubConn
|
|
}
|
|
}
|
|
|
|
// ErrTestConstPicker is error returned by test const picker.
|
|
var ErrTestConstPicker = fmt.Errorf("const picker error")
|
|
|
|
// TestConstPicker is a const picker for tests.
|
|
type TestConstPicker struct {
|
|
Err error
|
|
SC balancer.SubConn
|
|
}
|
|
|
|
// Pick returns the const SubConn or the error.
|
|
func (tcp *TestConstPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
|
|
if tcp.Err != nil {
|
|
return balancer.PickResult{}, tcp.Err
|
|
}
|
|
return balancer.PickResult{SubConn: tcp.SC}, nil
|
|
}
|