grpc: delete deprecated API WithBalancerName() (#5232)

This commit is contained in:
Easwar Swaminathan 2022-03-10 13:15:11 -08:00 committed by GitHub
parent cf6d4d5ab7
commit eead9a824c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 255 additions and 447 deletions

View File

@ -26,7 +26,7 @@ import (
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/resolver"
@ -161,7 +161,7 @@ func (s) TestSwitchBalancer(t *testing.T) {
servers, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -184,38 +184,11 @@ func (s) TestSwitchBalancer(t *testing.T) {
}
}
// Test that balancer specified by dial option will not be overridden.
func (s) TestBalancerDialOption(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
const numServers = 2
servers, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
r.UpdateState(resolver.State{Addresses: addrs})
// The init balancer is roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
// Balancer is still roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}
// First addr update contains grpclb.
func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -275,7 +248,7 @@ func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -351,7 +324,7 @@ func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -413,7 +386,7 @@ func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -503,7 +476,7 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
servers, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -550,7 +523,7 @@ func init() {
// This test is to make sure this close doesn't cause a deadlock.
func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}

View File

@ -674,7 +674,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
}
var balCfg serviceconfig.LoadBalancingConfig
if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}
@ -714,10 +714,6 @@ func (cc *ClientConn) switchBalancer(name string) {
}
channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
if cc.dopts.balancerBuilder != nil {
channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
return
}
if cc.balancerWrapper != nil {
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
@ -999,35 +995,28 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
cc.retryThrottler.Store((*retryThrottler)(nil))
}
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else {
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else {
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc.curBalancerName = cc.dopts.balancerBuilder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
}
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {

View File

@ -20,6 +20,7 @@ package grpc
import (
"context"
"fmt"
"net"
"sync"
"testing"
@ -28,6 +29,7 @@ import (
"golang.org/x/net/http2"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
@ -141,9 +143,6 @@ client enters TRANSIENT FAILURE.`,
}
func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
pl := testutils.NewPipeListener()
defer pl.Close()
@ -156,10 +155,9 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
connMu.Unlock()
}()
client, err := DialContext(ctx,
"",
WithInsecure(),
WithBalancerName(stateRecordingBalancerName),
client, err := Dial("",
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
WithDialer(pl.Dialer()),
withBackoff(noBackoff{}),
withMinConnectDeadline(func() time.Duration { return time.Millisecond * 100 }))
@ -170,12 +168,9 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
go stayConnected(client)
stateNotifications := testBalancerBuilder.nextStateNotifier()
timeout := time.After(5 * time.Second)
for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-time.After(defaultTestTimeout):
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen != want[i] {
@ -196,16 +191,6 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
// When a READY connection is closed, the client enters IDLE then CONNECTING.
func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
@ -237,7 +222,9 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
conn.Close()
}()
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
client, err := Dial(lis.Addr().String(),
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)))
if err != nil {
t.Fatal(err)
}
@ -246,11 +233,15 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
stateNotifications := testBalancerBuilder.nextStateNotifier()
timeout := time.After(5 * time.Second)
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}
for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-time.After(defaultTestTimeout):
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen == connectivity.Ready {
@ -266,14 +257,6 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
// When the first connection is closed, the client stays in CONNECTING until it
// tries the second address (which succeeds, and then it enters READY).
func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
lis1, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
@ -324,19 +307,25 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
}})
client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb))
client, err := Dial("whatever:///this-gets-overwritten",
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
WithResolvers(rb))
if err != nil {
t.Fatal(err)
}
defer client.Close()
stateNotifications := testBalancerBuilder.nextStateNotifier()
timeout := time.After(5 * time.Second)
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-ctx.Done():
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen != want[i] {
@ -345,12 +334,12 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
}
}
select {
case <-timeout:
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
case <-server1Done:
}
select {
case <-timeout:
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
case <-server2Done:
}
@ -359,16 +348,6 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
// When there are multiple addresses, and we enter READY on one of them, a
// later closure should cause the client to enter CONNECTING
func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
lis1, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
@ -414,7 +393,10 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
}})
client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb))
client, err := Dial("whatever:///this-gets-overwritten",
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
WithResolvers(rb))
if err != nil {
t.Fatal(err)
}
@ -422,12 +404,17 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
go stayConnected(client)
stateNotifications := testBalancerBuilder.nextStateNotifier()
timeout := time.After(2 * time.Second)
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-ctx.Done():
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen == connectivity.Ready {
@ -439,7 +426,7 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
}
}
select {
case <-timeout:
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
case <-server1Done:
}

View File

@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/transport"
@ -69,7 +70,7 @@ func (s) TestDialWithTimeout(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{Addresses: []resolver.Address{lisAddr}})
client, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithTimeout(5*time.Second))
client, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithTimeout(5*time.Second))
close(dialDone)
if err != nil {
t.Fatalf("Dial failed. Err: %v", err)
@ -121,7 +122,7 @@ func (s) TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{Addresses: []resolver.Address{lis1Addr, lis2Addr}})
client, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r))
client, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
if err != nil {
t.Fatalf("Dial failed. Err: %v", err)
}
@ -171,7 +172,7 @@ func (s) TestDialWaitsForServerSettings(t *testing.T) {
}()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock())
client, err := DialContext(ctx, lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()), WithBlock())
close(dialDone)
if err != nil {
t.Fatalf("Error while dialing. Err: %v", err)
@ -209,7 +210,7 @@ func (s) TestDialWaitsForServerSettingsAndFails(t *testing.T) {
defer cancel()
client, err := DialContext(ctx,
lis.Addr().String(),
WithInsecure(),
WithTransportCredentials(insecure.NewCredentials()),
WithReturnConnectionError(),
withBackoff(noBackoff{}),
withMinConnectDeadline(func() time.Duration { return time.Second / 4 }))
@ -286,7 +287,7 @@ func (s) TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) {
break
}
}()
client, err := Dial(lis.Addr().String(), WithInsecure(), withMinConnectDeadline(func() time.Duration { return time.Millisecond * 500 }))
client, err := Dial(lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()), withMinConnectDeadline(func() time.Duration { return time.Millisecond * 500 }))
if err != nil {
t.Fatalf("Error while dialing. Err: %v", err)
}
@ -342,7 +343,7 @@ func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {
prevAt = meow
}
}()
cc, err := Dial(lis.Addr().String(), WithInsecure())
cc, err := Dial(lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Error while dialing. Err: %v", err)
}
@ -352,7 +353,10 @@ func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {
}
func (s) TestWithTimeout(t *testing.T) {
conn, err := Dial("passthrough:///Non-Existent.Server:80", WithTimeout(time.Millisecond), WithBlock(), WithInsecure())
conn, err := Dial("passthrough:///Non-Existent.Server:80",
WithTimeout(time.Millisecond),
WithBlock(),
WithTransportCredentials(insecure.NewCredentials()))
if err == nil {
conn.Close()
}
@ -439,8 +443,8 @@ func (s) TestDial_OneBackoffPerRetryGroup(t *testing.T) {
{Addr: lis2.Addr().String()},
}})
client, err := DialContext(ctx, "whatever:///this-gets-overwritten",
WithInsecure(),
WithBalancerName(stateRecordingBalancerName),
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
WithResolvers(rb),
withMinConnectDeadline(getMinConnectTimeout))
if err != nil {
@ -466,7 +470,7 @@ func (s) TestDial_OneBackoffPerRetryGroup(t *testing.T) {
func (s) TestDialContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
if _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure()); err != context.Canceled {
if _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithTransportCredentials(insecure.NewCredentials())); err != context.Canceled {
t.Fatalf("DialContext(%v, _) = _, %v, want _, %v", ctx, err, context.Canceled)
}
}
@ -484,7 +488,7 @@ func (s) TestDialContextFailFast(t *testing.T) {
return nil, failErr
}
_, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure(), WithDialer(dialer), FailOnNonTempDialError(true))
_, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithTransportCredentials(insecure.NewCredentials()), WithDialer(dialer), FailOnNonTempDialError(true))
if terr, ok := err.(transport.ConnectionError); !ok || terr.Origin() != failErr {
t.Fatalf("DialContext() = _, %v, want _, %v", err, failErr)
}
@ -529,7 +533,7 @@ func (s) TestCredentialsMisuse(t *testing.T) {
// Use of perRPC creds requiring transport security over an insecure
// transport must fail.
if _, err := Dial("passthrough:///Non-Existent.Server:80", WithPerRPCCredentials(securePerRPCCredentials{}), WithInsecure()); err != errTransportCredentialsMissing {
if _, err := Dial("passthrough:///Non-Existent.Server:80", WithPerRPCCredentials(securePerRPCCredentials{}), WithTransportCredentials(insecure.NewCredentials())); err != errTransportCredentialsMissing {
t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredentialsMissing)
}
@ -573,7 +577,7 @@ func (s) TestWithConnectParams(t *testing.T) {
}
func testBackoffConfigSet(t *testing.T, wantBackoff internalbackoff.Exponential, opts ...DialOption) {
opts = append(opts, WithInsecure())
opts = append(opts, WithTransportCredentials(insecure.NewCredentials()))
conn, err := Dial("passthrough:///foo:80", opts...)
if err != nil {
t.Fatalf("unexpected error dialing connection: %v", err)
@ -597,7 +601,7 @@ func testBackoffConfigSet(t *testing.T, wantBackoff internalbackoff.Exponential,
func (s) TestConnectParamsWithMinConnectTimeout(t *testing.T) {
// Default value specified for minConnectTimeout in the spec is 20 seconds.
mct := 1 * time.Minute
conn, err := Dial("passthrough:///foo:80", WithInsecure(), WithConnectParams(ConnectParams{MinConnectTimeout: mct}))
conn, err := Dial("passthrough:///foo:80", WithTransportCredentials(insecure.NewCredentials()), WithConnectParams(ConnectParams{MinConnectTimeout: mct}))
if err != nil {
t.Fatalf("unexpected error dialing connection: %v", err)
}
@ -611,7 +615,7 @@ func (s) TestConnectParamsWithMinConnectTimeout(t *testing.T) {
func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -628,7 +632,7 @@ func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) {
for i := 0; i < 10; i++ { // Run this multiple times to make sure it doesn't panic.
r := manual.NewBuilderWithScheme(fmt.Sprintf("whatever-%d", i))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -641,7 +645,7 @@ func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) {
func (s) TestResolverEmptyUpdateNotPanic(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -691,7 +695,7 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) {
addr := lis.Addr().String()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cc, err := DialContext(ctx, addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{
cc, err := DialContext(ctx, addr, WithBlock(), WithTransportCredentials(insecure.NewCredentials()), WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 100 * time.Millisecond,
PermitWithoutStream: true,
@ -720,7 +724,7 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) {
func (s) TestDisableServiceConfigOption(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
addr := r.Scheme() + ":///non.existent"
cc, err := Dial(addr, WithInsecure(), WithResolvers(r), WithDisableServiceConfig())
cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDisableServiceConfig())
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
@ -747,7 +751,7 @@ func (s) TestDisableServiceConfigOption(t *testing.T) {
func (s) TestMethodConfigDefaultService(t *testing.T) {
addr := "nonexist:///non.existent"
cc, err := Dial(addr, WithInsecure(), WithDefaultServiceConfig(`{
cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithDefaultServiceConfig(`{
"methodConfig": [{
"name": [
{
@ -770,7 +774,7 @@ func (s) TestMethodConfigDefaultService(t *testing.T) {
func (s) TestGetClientConnTarget(t *testing.T) {
addr := "nonexist:///non.existent"
cc, err := Dial(addr, WithInsecure())
cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
@ -796,7 +800,7 @@ func (s) TestResetConnectBackoff(t *testing.T) {
dials <- struct{}{}
return nil, errors.New("failed to fake dial")
}
cc, err := Dial("any", WithInsecure(), WithDialer(dialer), withBackoff(backoffForever{}))
cc, err := Dial("any", WithTransportCredentials(insecure.NewCredentials()), WithDialer(dialer), withBackoff(backoffForever{}))
if err != nil {
t.Fatalf("Dial() = _, %v; want _, nil", err)
}
@ -825,7 +829,7 @@ func (s) TestResetConnectBackoff(t *testing.T) {
func (s) TestBackoffCancel(t *testing.T) {
dialStrCh := make(chan string)
cc, err := Dial("any", WithInsecure(), WithDialer(func(t string, _ time.Duration) (net.Conn, error) {
cc, err := Dial("any", WithTransportCredentials(insecure.NewCredentials()), WithDialer(func(t string, _ time.Duration) (net.Conn, error) {
dialStrCh <- t
return nil, fmt.Errorf("test dialer, always error")
}))
@ -956,10 +960,10 @@ func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) {
rb.InitialState(resolver.State{Addresses: addrsList})
client, err := Dial("whatever:///this-gets-overwritten",
WithInsecure(),
WithTransportCredentials(insecure.NewCredentials()),
WithResolvers(rb),
withBackoff(noBackoff{}),
WithBalancerName(stateRecordingBalancerName),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
withMinConnectDeadline(func() time.Duration { return time.Hour }))
if err != nil {
t.Fatal(err)
@ -1044,14 +1048,14 @@ func verifyWaitForReadyEqualsTrue(cc *ClientConn) bool {
}
func testInvalidDefaultServiceConfig(t *testing.T) {
_, err := Dial("fake.com", WithInsecure(), WithDefaultServiceConfig(""))
_, err := Dial("fake.com", WithTransportCredentials(insecure.NewCredentials()), WithDefaultServiceConfig(""))
if !strings.Contains(err.Error(), invalidDefaultServiceConfigErrPrefix) {
t.Fatalf("Dial got err: %v, want err contains: %v", err, invalidDefaultServiceConfigErrPrefix)
}
}
func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r *manual.Resolver, addr string, js string) {
cc, err := Dial(addr, WithInsecure(), WithDisableServiceConfig(), WithResolvers(r), WithDefaultServiceConfig(js))
cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithDisableServiceConfig(), WithResolvers(r), WithDefaultServiceConfig(js))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
@ -1067,7 +1071,7 @@ func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r *
}
func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T, r *manual.Resolver, addr string, js string) {
cc, err := Dial(addr, WithInsecure(), WithResolvers(r), WithDefaultServiceConfig(js))
cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDefaultServiceConfig(js))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
@ -1081,7 +1085,7 @@ func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T
}
func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T, r *manual.Resolver, addr string, js string) {
cc, err := Dial(addr, WithInsecure(), WithResolvers(r), WithDefaultServiceConfig(js))
cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDefaultServiceConfig(js))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}

View File

@ -20,12 +20,10 @@ package grpc
import (
"context"
"fmt"
"net"
"time"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
@ -46,18 +44,16 @@ type dialOptions struct {
chainUnaryInts []UnaryClientInterceptor
chainStreamInts []StreamClientInterceptor
cp Compressor
dc Decompressor
bs internalbackoff.Strategy
block bool
returnLastError bool
timeout time.Duration
scChan <-chan ServiceConfig
authority string
copts transport.ConnectOptions
callOptions []CallOption
// This is used by WithBalancerName dial option.
balancerBuilder balancer.Builder
cp Compressor
dc Decompressor
bs internalbackoff.Strategy
block bool
returnLastError bool
timeout time.Duration
scChan <-chan ServiceConfig
authority string
copts transport.ConnectOptions
callOptions []CallOption
channelzParentID *channelz.Identifier
disableServiceConfig bool
disableRetry bool
@ -196,25 +192,6 @@ func WithDecompressor(dc Decompressor) DialOption {
})
}
// WithBalancerName sets the balancer that the ClientConn will be initialized
// with. Balancer registered with balancerName will be used. This function
// panics if no balancer was registered by balancerName.
//
// The balancer cannot be overridden by balancer option specified by service
// config.
//
// Deprecated: use WithDefaultServiceConfig and WithDisableServiceConfig
// instead. Will be removed in a future 1.x release.
func WithBalancerName(balancerName string) DialOption {
builder := balancer.Get(balancerName)
if builder == nil {
panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
}
return newFuncDialOption(func(o *dialOptions) {
o.balancerBuilder = builder
})
}
// WithServiceConfig returns a DialOption which has a channel to read the
// service configuration.
//

View File

@ -153,7 +153,7 @@ func (s) TestCredsBundleFromBalancer(t *testing.T) {
te := newTest(t, env{name: "creds-bundle", network: "tcp", balancer: ""})
te.tapHandle = authHandle
te.customDialOptions = []grpc.DialOption{
grpc.WithBalancerName(testBalancerName),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalancerName)),
}
creds, err := credentials.NewServerTLSFromFile(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem"))
if err != nil {
@ -188,7 +188,7 @@ func testPickExtraMetadata(t *testing.T, e env) {
)
te.customDialOptions = []grpc.DialOption{
grpc.WithBalancerName(testBalancerName),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalancerName)),
grpc.WithUserAgent(testUserAgent),
}
te.startServer(&testServer{security: e.security})
@ -236,7 +236,7 @@ func testDoneInfo(t *testing.T, e env) {
b := &testBalancer{}
balancer.Register(b)
te.customDialOptions = []grpc.DialOption{
grpc.WithBalancerName(testBalancerName),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalancerName)),
}
te.userAgent = failAppUA
te.startServer(&testServer{security: e.security})
@ -315,7 +315,7 @@ func testDoneLoads(t *testing.T) {
return &testpb.Empty{}, nil
},
}
if err := ss.Start(nil, grpc.WithBalancerName(testBalancerName)); err != nil {
if err := ss.Start(nil, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalancerName))); err != nil {
t.Fatalf("error starting testing server: %v", err)
}
defer ss.Stop()
@ -393,8 +393,10 @@ func (s) TestNonGRPCLBBalancerGetsNoGRPCLBAddress(t *testing.T) {
b := newTestBalancerKeepAddresses()
balancer.Register(b)
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r),
grpc.WithBalancerName(b.Name()))
cc, err := grpc.Dial(r.Scheme()+":///test.server",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, b.Name())))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}

View File

@ -812,7 +812,7 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string)
scheme = te.resolverScheme + ":///"
}
if te.e.balancer != "" {
opts = append(opts, grpc.WithBalancerName(te.e.balancer))
opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, te.e.balancer)))
}
if te.clientInitialWindowSize > 0 {
opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))

View File

@ -49,7 +49,7 @@ func newTestHealthServer() *testHealthServer {
return newTestHealthServerWithWatchFunc(defaultWatchFunc)
}
func newTestHealthServerWithWatchFunc(f func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error) *testHealthServer {
func newTestHealthServerWithWatchFunc(f healthWatchFunc) *testHealthServer {
return &testHealthServer{
watchFunc: f,
update: make(chan struct{}, 1),
@ -83,9 +83,11 @@ func defaultWatchFunc(s *testHealthServer, in *healthpb.HealthCheckRequest, stre
return nil
}
type healthWatchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
type testHealthServer struct {
healthpb.UnimplementedHealthServer
watchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
watchFunc healthWatchFunc
mu sync.Mutex
status map[string]healthpb.HealthCheckResponse_ServingStatus
update chan struct{}
@ -125,25 +127,26 @@ func setupHealthCheckWrapper() (hcEnterChan chan struct{}, hcExitChan chan struc
return
}
type svrConfig struct {
specialWatchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
}
func setupServer(t *testing.T, watchFunc healthWatchFunc) (*grpc.Server, net.Listener, *testHealthServer) {
t.Helper()
func setupServer(sc *svrConfig) (s *grpc.Server, lis net.Listener, ts *testHealthServer, deferFunc func(), err error) {
s = grpc.NewServer()
lis, err = net.Listen("tcp", "localhost:0")
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, nil, nil, func() {}, fmt.Errorf("failed to listen due to err %v", err)
t.Fatalf("net.Listen() failed: %v", err)
}
if sc.specialWatchFunc != nil {
ts = newTestHealthServerWithWatchFunc(sc.specialWatchFunc)
var ts *testHealthServer
if watchFunc != nil {
ts = newTestHealthServerWithWatchFunc(watchFunc)
} else {
ts = newTestHealthServer()
}
s := grpc.NewServer()
healthgrpc.RegisterHealthServer(s, ts)
testpb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)
return s, lis, ts, s.Stop, nil
t.Cleanup(func() { s.Stop() })
return s, lis, ts
}
type clientConfig struct {
@ -152,28 +155,34 @@ type clientConfig struct {
extraDialOption []grpc.DialOption
}
func setupClient(c *clientConfig) (cc *grpc.ClientConn, r *manual.Resolver, deferFunc func(), err error) {
r = manual.NewBuilderWithScheme("whatever")
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r), grpc.WithBalancerName(c.balancerName))
if c.testHealthCheckFuncWrapper != nil {
opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper))
}
opts = append(opts, c.extraDialOption...)
cc, err = grpc.Dial(r.Scheme()+":///test.server", opts...)
if err != nil {
func setupClient(t *testing.T, c *clientConfig) (*grpc.ClientConn, *manual.Resolver) {
t.Helper()
return nil, nil, nil, fmt.Errorf("dial failed due to err: %v", err)
r := manual.NewBuilderWithScheme("whatever")
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
}
return cc, r, func() { cc.Close() }, nil
if c != nil {
if c.balancerName != "" {
opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, c.balancerName)))
}
if c.testHealthCheckFuncWrapper != nil {
opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper))
}
opts = append(opts, c.extraDialOption...)
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", opts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
return cc, r
}
func (s) TestHealthCheckWatchStateChange(t *testing.T) {
_, lis, ts, deferFunc, err := setupServer(&svrConfig{})
defer deferFunc()
if err != nil {
t.Fatal(err)
}
_, lis, ts := setupServer(t, nil)
// The table below shows the expected series of addrConn connectivity transitions when server
// updates its health status. As there's only one addrConn corresponds with the ClientConn in this
@ -189,20 +198,17 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) {
//+------------------------------+-------------------------------------------+
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING)
cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
cc, r := setupClient(t, nil)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
},
"loadBalancingConfig": [{"round_robin":{}}]
}`)})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
t.Fatal("ClientConn is still in IDLE state when the context times out.")
@ -258,22 +264,18 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) {
go s.Serve(lis)
defer s.Stop()
cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
cc, r := setupClient(t, nil)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
},
"loadBalancingConfig": [{"round_robin":{}}]
}`)})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
t.Fatal("ClientConn is still in IDLE state when the context times out.")
}
@ -288,37 +290,23 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) {
// In the case of a goaway received, the health check stream should be terminated and health check
// function should exit.
func (s) TestHealthCheckWithGoAway(t *testing.T) {
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
s, lis, ts, deferFunc, err := setupServer(&svrConfig{})
defer deferFunc()
if err != nil {
t.Fatal(err)
}
s, lis, ts := setupServer(t, nil)
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
cc, r, deferFunc, err := setupClient(&clientConfig{
balancerName: "round_robin",
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
tc := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
},
"loadBalancingConfig": [{"round_robin":{}}]
}`)})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// make some rpcs to make sure connection is working.
if err := verifyResultWithDelay(func() (bool, error) {
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
@ -380,33 +368,19 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) {
}
func (s) TestHealthCheckWithConnClose(t *testing.T) {
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
s, lis, ts, deferFunc, err := setupServer(&svrConfig{})
defer deferFunc()
if err != nil {
t.Fatal(err)
}
s, lis, ts := setupServer(t, nil)
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
cc, r, deferFunc, err := setupClient(&clientConfig{
balancerName: "round_robin",
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
tc := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
},
"loadBalancingConfig": [{"round_robin":{}}]
}`)})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -444,30 +418,17 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) {
// addrConn drain happens when addrConn gets torn down due to its address being no longer in the
// address list returned by the resolver.
func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
_, lis, ts, deferFunc, err := setupServer(&svrConfig{})
defer deferFunc()
if err != nil {
t.Fatal(err)
}
_, lis, ts := setupServer(t, nil)
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
cc, r, deferFunc, err := setupClient(&clientConfig{
balancerName: "round_robin",
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
tc := testpb.NewTestServiceClient(cc)
sc := parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
},
"loadBalancingConfig": [{"round_robin":{}}]
}`)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
@ -537,32 +498,19 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
// ClientConn close will lead to its addrConns being torn down.
func (s) TestHealthCheckWithClientConnClose(t *testing.T) {
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
_, lis, ts, deferFunc, err := setupServer(&svrConfig{})
defer deferFunc()
if err != nil {
t.Fatal(err)
}
_, lis, ts := setupServer(t, nil)
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
cc, r, deferFunc, err := setupClient(&clientConfig{
balancerName: "round_robin",
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
tc := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
},
"loadBalancingConfig": [{"round_robin":{}}]
}`)})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -602,39 +550,25 @@ func (s) TestHealthCheckWithClientConnClose(t *testing.T) {
// closes the skipReset channel(since it has not been closed inside health check func) to unblock
// onGoAway/onClose goroutine.
func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *testing.T) {
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
_, lis, ts, deferFunc, err := setupServer(&svrConfig{
specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
if in.Service != "delay" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"delay\"")
}
// Do nothing to mock a delay of health check response from server side.
// This case is to help with the test that covers the condition that setConnectivityState is not
// called inside HealthCheckFunc before the func returns.
select {
case <-stream.Context().Done():
case <-time.After(5 * time.Second):
}
return nil
},
})
defer deferFunc()
if err != nil {
t.Fatal(err)
watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
if in.Service != "delay" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"delay\"")
}
// Do nothing to mock a delay of health check response from server side.
// This case is to help with the test that covers the condition that setConnectivityState is not
// called inside HealthCheckFunc before the func returns.
select {
case <-stream.Context().Done():
case <-time.After(5 * time.Second):
}
return nil
}
_, lis, ts := setupServer(t, watchFunc)
ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
_, r, deferFunc, err := setupClient(&clientConfig{
balancerName: "round_robin",
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
_, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
// The serviceName "delay" is specially handled at server side, where response will not be sent
// back to client immediately upon receiving the request (client should receive no response until
@ -642,7 +576,8 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes
sc := parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "delay"
}
},
"loadBalancingConfig": [{"round_robin":{}}]
}`)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
@ -678,39 +613,25 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes
// closes the allowedToReset channel(since it has not been closed inside health check func) to unblock
// onGoAway/onClose goroutine.
func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) {
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
s, lis, ts, deferFunc, err := setupServer(&svrConfig{
specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
if in.Service != "delay" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"delay\"")
}
// Do nothing to mock a delay of health check response from server side.
// This case is to help with the test that covers the condition that setConnectivityState is not
// called inside HealthCheckFunc before the func returns.
select {
case <-stream.Context().Done():
case <-time.After(5 * time.Second):
}
return nil
},
})
defer deferFunc()
if err != nil {
t.Fatal(err)
watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
if in.Service != "delay" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"delay\"")
}
// Do nothing to mock a delay of health check response from server side.
// This case is to help with the test that covers the condition that setConnectivityState is not
// called inside HealthCheckFunc before the func returns.
select {
case <-stream.Context().Done():
case <-time.After(5 * time.Second):
}
return nil
}
s, lis, ts := setupServer(t, watchFunc)
ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
_, r, deferFunc, err := setupClient(&clientConfig{
balancerName: "round_robin",
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
_, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
// The serviceName "delay" is specially handled at server side, where response will not be sent
// back to client immediately upon receiving the request (client should receive no response until
@ -720,7 +641,8 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) {
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "delay"
}
},
"loadBalancingConfig": [{"round_robin":{}}]
}`)})
select {
@ -750,25 +672,18 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) {
func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r, deferFunc, err := setupClient(&clientConfig{
balancerName: "round_robin",
cc, r := setupClient(t, &clientConfig{
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
extraDialOption: []grpc.DialOption{grpc.WithDisableHealthCheck()},
})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
tc := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
},
"loadBalancingConfig": [{"round_robin":{}}]
}`)})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -792,24 +707,17 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r, deferFunc, err := setupClient(&clientConfig{
balancerName: "pick_first",
cc, r := setupClient(t, &clientConfig{
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
tc := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
},
"loadBalancingConfig": [{"pick_first":{}}]
}`)})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -833,18 +741,8 @@ func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r, deferFunc, err := setupClient(&clientConfig{
balancerName: "round_robin",
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
tc := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: addr}}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -867,11 +765,7 @@ func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
}
func (s) TestHealthCheckDisable(t *testing.T) {
_, lis, ts, deferFunc, err := setupServer(&svrConfig{})
defer deferFunc()
if err != nil {
t.Fatal(err)
}
_, lis, ts := setupServer(t, nil)
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
// test client side disabling configuration.
@ -881,32 +775,23 @@ func (s) TestHealthCheckDisable(t *testing.T) {
}
func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
_, lis, _, deferFunc, err := setupServer(&svrConfig{
specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
if in.Service != "channelzSuccess" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"channelzSuccess\"")
}
return status.Error(codes.OK, "fake success")
},
})
defer deferFunc()
if err != nil {
t.Fatal(err)
watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
if in.Service != "channelzSuccess" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"channelzSuccess\"")
}
return status.Error(codes.OK, "fake success")
}
_, lis, _ := setupServer(t, watchFunc)
_, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
_, r := setupClient(t, nil)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "channelzSuccess"
}
},
"loadBalancingConfig": [{"round_robin":{}}]
}`)})
if err := verifyResultWithDelay(func() (bool, error) {
@ -937,32 +822,23 @@ func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
}
func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
_, lis, _, deferFunc, err := setupServer(&svrConfig{
specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
if in.Service != "channelzFailure" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"channelzFailure\"")
}
return status.Error(codes.Internal, "fake failure")
},
})
if err != nil {
t.Fatal(err)
watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
if in.Service != "channelzFailure" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"channelzFailure\"")
}
return status.Error(codes.Internal, "fake failure")
}
defer deferFunc()
_, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
_, lis, _ := setupServer(t, watchFunc)
_, r := setupClient(t, nil)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "channelzFailure"
}
},
"loadBalancingConfig": [{"round_robin":{}}]
}`)})
if err := verifyResultWithDelay(func() (bool, error) {