mirror of https://github.com/grpc/grpc-go.git
xds: fix flaky test (TestPickerUpdateAfterClose) (#4658)
This commit is contained in:
parent
fc30d5b571
commit
574137db7d
|
@ -22,6 +22,7 @@ package clusterimpl
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
@ -30,9 +31,12 @@ import (
|
|||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/balancer/base"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/balancer/stub"
|
||||
"google.golang.org/grpc/internal/grpctest"
|
||||
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
|
||||
"google.golang.org/grpc/resolver"
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
|
@ -62,6 +66,14 @@ var (
|
|||
}
|
||||
)
|
||||
|
||||
type s struct {
|
||||
grpctest.Tester
|
||||
}
|
||||
|
||||
func Test(t *testing.T) {
|
||||
grpctest.RunSubTests(t, s{})
|
||||
}
|
||||
|
||||
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
|
||||
return func() balancer.SubConn {
|
||||
scst, _ := p.Pick(balancer.PickInfo{})
|
||||
|
@ -75,7 +87,7 @@ func init() {
|
|||
|
||||
// TestDropByCategory verifies that the balancer correctly drops the picks, and
|
||||
// that the drops are reported.
|
||||
func TestDropByCategory(t *testing.T) {
|
||||
func (s) TestDropByCategory(t *testing.T) {
|
||||
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
|
||||
xdsC := fakeclient.NewClient()
|
||||
defer xdsC.Close()
|
||||
|
@ -233,7 +245,7 @@ func TestDropByCategory(t *testing.T) {
|
|||
|
||||
// TestDropCircuitBreaking verifies that the balancer correctly drops the picks
|
||||
// due to circuit breaking, and that the drops are reported.
|
||||
func TestDropCircuitBreaking(t *testing.T) {
|
||||
func (s) TestDropCircuitBreaking(t *testing.T) {
|
||||
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
|
||||
xdsC := fakeclient.NewClient()
|
||||
defer xdsC.Close()
|
||||
|
@ -341,10 +353,11 @@ func TestDropCircuitBreaking(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestPickerUpdateAfterClose covers the case that cluster_impl wants to update
|
||||
// picker after it's closed. Because picker updates are sent in the run()
|
||||
// goroutine.
|
||||
func TestPickerUpdateAfterClose(t *testing.T) {
|
||||
// TestPickerUpdateAfterClose covers the case where a child policy sends a
|
||||
// picker update after the cluster_impl policy is closed. Because picker updates
|
||||
// are handled in the run() goroutine, which exits before Close() returns, we
|
||||
// expect the above picker update to be dropped.
|
||||
func (s) TestPickerUpdateAfterClose(t *testing.T) {
|
||||
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
|
||||
xdsC := fakeclient.NewClient()
|
||||
defer xdsC.Close()
|
||||
|
@ -353,6 +366,30 @@ func TestPickerUpdateAfterClose(t *testing.T) {
|
|||
cc := testutils.NewTestClientConn(t)
|
||||
b := builder.Build(cc, balancer.BuildOptions{})
|
||||
|
||||
// Create a stub balancer which waits for the cluster_impl policy to be
|
||||
// closed before sending a picker update (upon receipt of a subConn state
|
||||
// change).
|
||||
closeCh := make(chan struct{})
|
||||
const childPolicyName = "stubBalancer-TestPickerUpdateAfterClose"
|
||||
stub.Register(childPolicyName, stub.BalancerFuncs{
|
||||
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
|
||||
// Create a subConn which will be used later on to test the race
|
||||
// between UpdateSubConnState() and Close().
|
||||
bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
|
||||
return nil
|
||||
},
|
||||
UpdateSubConnState: func(bd *stub.BalancerData, _ balancer.SubConn, _ balancer.SubConnState) {
|
||||
go func() {
|
||||
// Wait for Close() to be called on the parent policy before
|
||||
// sending the picker update.
|
||||
<-closeCh
|
||||
bd.ClientConn.UpdateState(balancer.State{
|
||||
Picker: base.NewErrPicker(errors.New("dummy error picker")),
|
||||
})
|
||||
}()
|
||||
},
|
||||
})
|
||||
|
||||
var maxRequest uint32 = 50
|
||||
if err := b.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
|
||||
|
@ -361,7 +398,7 @@ func TestPickerUpdateAfterClose(t *testing.T) {
|
|||
EDSServiceName: testServiceName,
|
||||
MaxConcurrentRequests: &maxRequest,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: roundrobin.Name,
|
||||
Name: childPolicyName,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
|
@ -369,23 +406,24 @@ func TestPickerUpdateAfterClose(t *testing.T) {
|
|||
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
|
||||
}
|
||||
|
||||
// Send SubConn state changes to trigger picker updates. Balancer will
|
||||
// closed in a defer.
|
||||
// Send a subConn state change to trigger a picker update. The stub balancer
|
||||
// that we use as the child policy will not send a picker update until the
|
||||
// parent policy is closed.
|
||||
sc1 := <-cc.NewSubConnCh
|
||||
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
|
||||
// This close will race with the SubConn state update.
|
||||
b.Close()
|
||||
close(closeCh)
|
||||
|
||||
select {
|
||||
case <-cc.NewPickerCh:
|
||||
t.Fatalf("unexpected picker update after balancer is closed")
|
||||
case <-time.After(time.Millisecond * 10):
|
||||
case <-time.After(defaultShortTestTimeout):
|
||||
}
|
||||
}
|
||||
|
||||
// TestClusterNameInAddressAttributes covers the case that cluster name is
|
||||
// attached to the subconn address attributes.
|
||||
func TestClusterNameInAddressAttributes(t *testing.T) {
|
||||
func (s) TestClusterNameInAddressAttributes(t *testing.T) {
|
||||
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
|
||||
xdsC := fakeclient.NewClient()
|
||||
defer xdsC.Close()
|
||||
|
@ -470,7 +508,7 @@ func TestClusterNameInAddressAttributes(t *testing.T) {
|
|||
|
||||
// TestReResolution verifies that when a SubConn turns transient failure,
|
||||
// re-resolution is triggered.
|
||||
func TestReResolution(t *testing.T) {
|
||||
func (s) TestReResolution(t *testing.T) {
|
||||
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
|
||||
xdsC := fakeclient.NewClient()
|
||||
defer xdsC.Close()
|
||||
|
@ -547,7 +585,7 @@ func TestReResolution(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestLoadReporting(t *testing.T) {
|
||||
func (s) TestLoadReporting(t *testing.T) {
|
||||
var testLocality = xdsinternal.LocalityID{
|
||||
Region: "test-region",
|
||||
Zone: "test-zone",
|
||||
|
@ -662,7 +700,7 @@ func TestLoadReporting(t *testing.T) {
|
|||
// - the init config specifies "" as the LRS server
|
||||
// - config modifies LRS server to a different string
|
||||
// - config sets LRS server to nil to stop load reporting
|
||||
func TestUpdateLRSServer(t *testing.T) {
|
||||
func (s) TestUpdateLRSServer(t *testing.T) {
|
||||
var testLocality = xdsinternal.LocalityID{
|
||||
Region: "test-region",
|
||||
Zone: "test-zone",
|
||||
|
|
Loading…
Reference in New Issue