mirror of https://github.com/grpc/grpc-go.git
700 lines
20 KiB
Go
700 lines
20 KiB
Go
/*
|
|
*
|
|
* Copyright 2018 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"golang.org/x/net/http2"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/backoff"
|
|
"google.golang.org/grpc/balancer"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/internal"
|
|
"google.golang.org/grpc/internal/balancer/stub"
|
|
"google.golang.org/grpc/internal/envconfig"
|
|
"google.golang.org/grpc/internal/grpcsync"
|
|
"google.golang.org/grpc/internal/stubserver"
|
|
"google.golang.org/grpc/internal/testutils"
|
|
testgrpc "google.golang.org/grpc/interop/grpc_testing"
|
|
testpb "google.golang.org/grpc/interop/grpc_testing"
|
|
"google.golang.org/grpc/resolver"
|
|
"google.golang.org/grpc/resolver/manual"
|
|
)
|
|
|
|
const stateRecordingBalancerName = "state_recording_balancer"
|
|
|
|
var testBalancerBuilder = newStateRecordingBalancerBuilder()
|
|
|
|
func init() {
|
|
balancer.Register(testBalancerBuilder)
|
|
}
|
|
|
|
// These tests use a pipeListener. This listener is similar to net.Listener
|
|
// except that it is unbuffered, so each read and write will wait for the other
|
|
// side's corresponding write or read.
|
|
func (s) TestStateTransitions_SingleAddress(t *testing.T) {
|
|
for _, test := range []struct {
|
|
desc string
|
|
want []connectivity.State
|
|
server func(net.Listener) net.Conn
|
|
}{
|
|
{
|
|
desc: "When the server returns server preface, the client enters READY.",
|
|
want: []connectivity.State{
|
|
connectivity.Connecting,
|
|
connectivity.Ready,
|
|
},
|
|
server: func(lis net.Listener) net.Conn {
|
|
conn, err := lis.Accept()
|
|
if err != nil {
|
|
t.Error(err)
|
|
return nil
|
|
}
|
|
|
|
go keepReading(conn)
|
|
|
|
framer := http2.NewFramer(conn, conn)
|
|
if err := framer.WriteSettings(http2.Setting{}); err != nil {
|
|
t.Errorf("Error while writing settings frame. %v", err)
|
|
return nil
|
|
}
|
|
|
|
return conn
|
|
},
|
|
},
|
|
{
|
|
desc: "When the connection is closed before the preface is sent, the client enters TRANSIENT FAILURE.",
|
|
want: []connectivity.State{
|
|
connectivity.Connecting,
|
|
connectivity.TransientFailure,
|
|
},
|
|
server: func(lis net.Listener) net.Conn {
|
|
conn, err := lis.Accept()
|
|
if err != nil {
|
|
t.Error(err)
|
|
return nil
|
|
}
|
|
|
|
conn.Close()
|
|
return nil
|
|
},
|
|
},
|
|
{
|
|
desc: `When the server sends its connection preface, but the connection dies before the client can write its
|
|
connection preface, the client enters TRANSIENT FAILURE.`,
|
|
want: []connectivity.State{
|
|
connectivity.Connecting,
|
|
connectivity.TransientFailure,
|
|
},
|
|
server: func(lis net.Listener) net.Conn {
|
|
conn, err := lis.Accept()
|
|
if err != nil {
|
|
t.Error(err)
|
|
return nil
|
|
}
|
|
|
|
framer := http2.NewFramer(conn, conn)
|
|
if err := framer.WriteSettings(http2.Setting{}); err != nil {
|
|
t.Errorf("Error while writing settings frame. %v", err)
|
|
return nil
|
|
}
|
|
|
|
conn.Close()
|
|
return nil
|
|
},
|
|
},
|
|
{
|
|
desc: `When the server reads the client connection preface but does not send its connection preface, the
|
|
client enters TRANSIENT FAILURE.`,
|
|
want: []connectivity.State{
|
|
connectivity.Connecting,
|
|
connectivity.TransientFailure,
|
|
},
|
|
server: func(lis net.Listener) net.Conn {
|
|
conn, err := lis.Accept()
|
|
if err != nil {
|
|
t.Error(err)
|
|
return nil
|
|
}
|
|
|
|
go keepReading(conn)
|
|
|
|
return conn
|
|
},
|
|
},
|
|
} {
|
|
t.Log(test.desc)
|
|
testStateTransitionSingleAddress(t, test.want, test.server)
|
|
}
|
|
}
|
|
|
|
func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
|
|
pl := testutils.NewPipeListener()
|
|
defer pl.Close()
|
|
|
|
// Launch the server.
|
|
var conn net.Conn
|
|
var connMu sync.Mutex
|
|
go func() {
|
|
connMu.Lock()
|
|
conn = server(pl)
|
|
connMu.Unlock()
|
|
}()
|
|
|
|
client, err := grpc.NewClient("passthrough:///",
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
|
|
grpc.WithDialer(pl.Dialer()),
|
|
grpc.WithConnectParams(grpc.ConnectParams{
|
|
Backoff: backoff.Config{},
|
|
MinConnectTimeout: 100 * time.Millisecond,
|
|
}))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
go testutils.StayConnected(ctx, client)
|
|
|
|
// Wait for the test balancer to be built before capturing it's state
|
|
// notification channel.
|
|
testutils.AwaitNotState(ctx, t, client, connectivity.Idle)
|
|
stateNotifications := testBalancerBuilder.nextStateNotifier()
|
|
for i := 0; i < len(want); i++ {
|
|
select {
|
|
case <-time.After(defaultTestTimeout):
|
|
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
|
|
case seen := <-stateNotifications:
|
|
if seen != want[i] {
|
|
t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
|
|
}
|
|
}
|
|
}
|
|
|
|
connMu.Lock()
|
|
defer connMu.Unlock()
|
|
if conn != nil {
|
|
err = conn.Close()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// When a READY connection is closed, the client enters IDLE then CONNECTING.
|
|
func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
|
|
lis, err := net.Listen("tcp", "localhost:0")
|
|
if err != nil {
|
|
t.Fatalf("Error while listening. Err: %v", err)
|
|
}
|
|
defer lis.Close()
|
|
|
|
sawReady := make(chan struct{}, 1)
|
|
defer close(sawReady)
|
|
|
|
// Launch the server.
|
|
go func() {
|
|
conn, err := lis.Accept()
|
|
if err != nil {
|
|
t.Error(err)
|
|
return
|
|
}
|
|
|
|
go keepReading(conn)
|
|
|
|
framer := http2.NewFramer(conn, conn)
|
|
if err := framer.WriteSettings(http2.Setting{}); err != nil {
|
|
t.Errorf("Error while writing settings frame. %v", err)
|
|
return
|
|
}
|
|
|
|
// Prevents race between onPrefaceReceipt and onClose.
|
|
<-sawReady
|
|
|
|
conn.Close()
|
|
}()
|
|
|
|
client, err := grpc.NewClient(lis.Addr().String(),
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
go testutils.StayConnected(ctx, client)
|
|
testutils.AwaitNotState(ctx, t, client, connectivity.Idle)
|
|
stateNotifications := testBalancerBuilder.nextStateNotifier()
|
|
|
|
want := []connectivity.State{
|
|
connectivity.Connecting,
|
|
connectivity.Ready,
|
|
connectivity.Idle,
|
|
connectivity.Connecting,
|
|
}
|
|
for i := 0; i < len(want); i++ {
|
|
select {
|
|
case <-time.After(defaultTestTimeout):
|
|
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
|
|
case seen := <-stateNotifications:
|
|
if seen == connectivity.Ready {
|
|
sawReady <- struct{}{}
|
|
}
|
|
if seen != want[i] {
|
|
t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// When the first connection is closed, the client stays in CONNECTING until it
|
|
// tries the second address (which succeeds, and then it enters READY).
|
|
func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
|
|
lis1, err := net.Listen("tcp", "localhost:0")
|
|
if err != nil {
|
|
t.Fatalf("Error while listening. Err: %v", err)
|
|
}
|
|
defer lis1.Close()
|
|
|
|
lis2, err := net.Listen("tcp", "localhost:0")
|
|
if err != nil {
|
|
t.Fatalf("Error while listening. Err: %v", err)
|
|
}
|
|
defer lis2.Close()
|
|
|
|
server1Done := make(chan struct{})
|
|
server2Done := make(chan struct{})
|
|
|
|
// Launch server 1.
|
|
go func() {
|
|
conn, err := lis1.Accept()
|
|
if err != nil {
|
|
t.Error(err)
|
|
return
|
|
}
|
|
|
|
conn.Close()
|
|
close(server1Done)
|
|
}()
|
|
// Launch server 2.
|
|
go func() {
|
|
conn, err := lis2.Accept()
|
|
if err != nil {
|
|
t.Error(err)
|
|
return
|
|
}
|
|
|
|
go keepReading(conn)
|
|
|
|
framer := http2.NewFramer(conn, conn)
|
|
if err := framer.WriteSettings(http2.Setting{}); err != nil {
|
|
t.Errorf("Error while writing settings frame. %v", err)
|
|
return
|
|
}
|
|
|
|
close(server2Done)
|
|
}()
|
|
|
|
rb := manual.NewBuilderWithScheme("whatever")
|
|
rb.InitialState(resolver.State{Addresses: []resolver.Address{
|
|
{Addr: lis1.Addr().String()},
|
|
{Addr: lis2.Addr().String()},
|
|
}})
|
|
client, err := grpc.NewClient("whatever:///this-gets-overwritten",
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
|
|
grpc.WithConnectParams(grpc.ConnectParams{
|
|
// Set a really long back-off delay to ensure the first subConn does
|
|
// not enter IDLE before the second subConn connects.
|
|
Backoff: backoff.Config{
|
|
BaseDelay: 1 * time.Hour,
|
|
},
|
|
}),
|
|
grpc.WithResolvers(rb))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer client.Close()
|
|
client.Connect()
|
|
stateNotifications := testBalancerBuilder.nextStateNotifier()
|
|
want := []connectivity.State{
|
|
connectivity.Connecting,
|
|
connectivity.Ready,
|
|
}
|
|
if envconfig.NewPickFirstEnabled {
|
|
want = []connectivity.State{
|
|
// The first subconn fails.
|
|
connectivity.Connecting,
|
|
connectivity.TransientFailure,
|
|
// The second subconn connects.
|
|
connectivity.Connecting,
|
|
connectivity.Ready,
|
|
}
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
for i := 0; i < len(want); i++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
|
|
case seen := <-stateNotifications:
|
|
if seen != want[i] {
|
|
t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
|
|
}
|
|
}
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
|
|
case <-server1Done:
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
|
|
case <-server2Done:
|
|
}
|
|
}
|
|
|
|
// When there are multiple addresses, and we enter READY on one of them, a
|
|
// later closure should cause the client to enter CONNECTING
|
|
func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
|
|
lis1, err := net.Listen("tcp", "localhost:0")
|
|
if err != nil {
|
|
t.Fatalf("Error while listening. Err: %v", err)
|
|
}
|
|
defer lis1.Close()
|
|
|
|
// Never actually gets used; we just want it to be alive so that the resolver has two addresses to target.
|
|
lis2, err := net.Listen("tcp", "localhost:0")
|
|
if err != nil {
|
|
t.Fatalf("Error while listening. Err: %v", err)
|
|
}
|
|
defer lis2.Close()
|
|
|
|
server1Done := make(chan struct{})
|
|
sawReady := make(chan struct{}, 1)
|
|
defer close(sawReady)
|
|
|
|
// Launch server 1.
|
|
go func() {
|
|
conn, err := lis1.Accept()
|
|
if err != nil {
|
|
t.Error(err)
|
|
return
|
|
}
|
|
|
|
go keepReading(conn)
|
|
|
|
framer := http2.NewFramer(conn, conn)
|
|
if err := framer.WriteSettings(http2.Setting{}); err != nil {
|
|
t.Errorf("Error while writing settings frame. %v", err)
|
|
return
|
|
}
|
|
|
|
<-sawReady
|
|
|
|
conn.Close()
|
|
|
|
close(server1Done)
|
|
}()
|
|
|
|
rb := manual.NewBuilderWithScheme("whatever")
|
|
rb.InitialState(resolver.State{Addresses: []resolver.Address{
|
|
{Addr: lis1.Addr().String()},
|
|
{Addr: lis2.Addr().String()},
|
|
}})
|
|
client, err := grpc.NewClient("whatever:///this-gets-overwritten",
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
|
|
grpc.WithResolvers(rb))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer client.Close()
|
|
client.Connect()
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
go testutils.StayConnected(ctx, client)
|
|
|
|
stateNotifications := testBalancerBuilder.nextStateNotifier()
|
|
want := []connectivity.State{
|
|
connectivity.Connecting,
|
|
connectivity.Ready,
|
|
connectivity.Idle,
|
|
connectivity.Connecting,
|
|
}
|
|
for i := 0; i < len(want); i++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
|
|
case seen := <-stateNotifications:
|
|
if seen == connectivity.Ready {
|
|
sawReady <- struct{}{}
|
|
}
|
|
if seen != want[i] {
|
|
t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
|
|
}
|
|
}
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
|
|
case <-server1Done:
|
|
}
|
|
}
|
|
|
|
type stateRecordingBalancer struct {
|
|
balancer.Balancer
|
|
}
|
|
|
|
func (b *stateRecordingBalancer) Close() {
|
|
b.Balancer.Close()
|
|
}
|
|
|
|
type stateRecordingBalancerBuilder struct {
|
|
mu sync.Mutex
|
|
notifier chan connectivity.State // The notifier used in the last Balancer.
|
|
}
|
|
|
|
func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder {
|
|
return &stateRecordingBalancerBuilder{}
|
|
}
|
|
|
|
func (b *stateRecordingBalancerBuilder) Name() string {
|
|
return stateRecordingBalancerName
|
|
}
|
|
|
|
func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
|
stateNotifications := make(chan connectivity.State, 10)
|
|
b.mu.Lock()
|
|
b.notifier = stateNotifications
|
|
b.mu.Unlock()
|
|
return &stateRecordingBalancer{
|
|
Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
|
|
}
|
|
}
|
|
|
|
func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
ret := b.notifier
|
|
b.notifier = nil
|
|
return ret
|
|
}
|
|
|
|
type stateRecordingCCWrapper struct {
|
|
balancer.ClientConn
|
|
notifier chan<- connectivity.State
|
|
}
|
|
|
|
func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
|
oldListener := opts.StateListener
|
|
opts.StateListener = func(s balancer.SubConnState) {
|
|
ccw.notifier <- s.ConnectivityState
|
|
oldListener(s)
|
|
}
|
|
return ccw.ClientConn.NewSubConn(addrs, opts)
|
|
}
|
|
|
|
// Keep reading until something causes the connection to die (EOF, server
|
|
// closed, etc). Useful as a tool for mindlessly keeping the connection
|
|
// healthy, since the client will error if things like client prefaces are not
|
|
// accepted in a timely fashion.
|
|
func keepReading(conn net.Conn) {
|
|
buf := make([]byte, 1024)
|
|
for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
|
|
}
|
|
}
|
|
|
|
type funcConnectivityStateSubscriber struct {
|
|
onMsg func(connectivity.State)
|
|
}
|
|
|
|
func (f *funcConnectivityStateSubscriber) OnMessage(msg any) {
|
|
f.onMsg(msg.(connectivity.State))
|
|
}
|
|
|
|
// TestConnectivityStateSubscriber confirms updates sent by the balancer in
|
|
// rapid succession are not missed by the subscriber.
|
|
func (s) TestConnectivityStateSubscriber(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
|
|
sendStates := []connectivity.State{
|
|
connectivity.Connecting,
|
|
connectivity.Ready,
|
|
connectivity.Idle,
|
|
connectivity.Connecting,
|
|
connectivity.Idle,
|
|
connectivity.Connecting,
|
|
connectivity.Ready,
|
|
}
|
|
wantStates := append(sendStates, connectivity.Shutdown)
|
|
|
|
const testBalName = "any"
|
|
bf := stub.BalancerFuncs{
|
|
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
|
|
// Send the expected states in rapid succession.
|
|
for _, s := range sendStates {
|
|
t.Logf("Sending state update %s", s)
|
|
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: s})
|
|
}
|
|
return nil
|
|
},
|
|
}
|
|
stub.Register(testBalName, bf)
|
|
|
|
// Create the ClientConn.
|
|
const testResName = "any"
|
|
rb := manual.NewBuilderWithScheme(testResName)
|
|
cc, err := grpc.NewClient(testResName+":///",
|
|
grpc.WithResolvers(rb),
|
|
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalName)),
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("grpc.NewClient() failed: %v", err)
|
|
}
|
|
cc.Connect()
|
|
// Subscribe to state updates. Use a buffer size of 1 to allow the
|
|
// Shutdown state to go into the channel when Close()ing.
|
|
connCh := make(chan connectivity.State, 1)
|
|
s := &funcConnectivityStateSubscriber{
|
|
onMsg: func(s connectivity.State) {
|
|
select {
|
|
case connCh <- s:
|
|
case <-ctx.Done():
|
|
}
|
|
if s == connectivity.Shutdown {
|
|
close(connCh)
|
|
}
|
|
},
|
|
}
|
|
|
|
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)
|
|
|
|
// Send an update from the resolver that will trigger the LB policy's UpdateClientConnState.
|
|
go rb.UpdateState(resolver.State{})
|
|
|
|
// Verify the resulting states.
|
|
for i, want := range wantStates {
|
|
if i == len(sendStates) {
|
|
// Trigger Shutdown to be sent by the channel. Use a goroutine to
|
|
// ensure the operation does not block.
|
|
cc.Close()
|
|
}
|
|
select {
|
|
case got := <-connCh:
|
|
if got != want {
|
|
t.Errorf("Update %v was %s; want %s", i, got, want)
|
|
} else {
|
|
t.Logf("Update %v was %s as expected", i, got)
|
|
}
|
|
case <-ctx.Done():
|
|
t.Fatalf("Timed out waiting for state update %v: %s", i, want)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestChannelStateWaitingForFirstResolverUpdate verifies the initial
|
|
// state of the channel when a manual name resolver doesn't provide any updates.
|
|
func (s) TestChannelStateWaitingForFirstResolverUpdate(t *testing.T) {
|
|
t.Skip("The channel remains in IDLE until the LB policy updates the state to CONNECTING. This is a bug and the channel should transition to CONNECTING as soon as Connect() is called. See issue #7686.")
|
|
|
|
backend := stubserver.StartTestService(t, nil)
|
|
defer backend.Stop()
|
|
|
|
mr := manual.NewBuilderWithScheme("e2e-test")
|
|
defer mr.Close()
|
|
|
|
cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create new client: %v", err)
|
|
}
|
|
defer cc.Close()
|
|
|
|
if state := cc.GetState(); state != connectivity.Idle {
|
|
t.Fatalf("Expected initial state to be IDLE, got %v", state)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
|
|
// The channel should transition to CONNECTING automatically when Connect()
|
|
// is called.
|
|
cc.Connect()
|
|
testutils.AwaitState(ctx, t, cc, connectivity.Connecting)
|
|
|
|
// Verify that the channel remains in CONNECTING state for a short time.
|
|
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
|
|
defer shortCancel()
|
|
testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Connecting)
|
|
}
|
|
|
|
func (s) TestChannelStateTransitionWithRPC(t *testing.T) {
|
|
t.Skip("The channel remains in IDLE until the LB policy updates the state to CONNECTING. This is a bug and the channel should transition to CONNECTING as soon as an RPC call is made. See issue #7686.")
|
|
|
|
backend := stubserver.StartTestService(t, nil)
|
|
defer backend.Stop()
|
|
|
|
mr := manual.NewBuilderWithScheme("e2e-test")
|
|
defer mr.Close()
|
|
|
|
cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create new client: %v", err)
|
|
}
|
|
defer cc.Close()
|
|
|
|
if state := cc.GetState(); state != connectivity.Idle {
|
|
t.Fatalf("Expected initial state to be IDLE, got %v", state)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
|
|
// Make an RPC call to transition the channel to CONNECTING.
|
|
go func() {
|
|
_, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{})
|
|
if err == nil {
|
|
t.Errorf("Expected RPC to fail, but it succeeded")
|
|
}
|
|
}()
|
|
|
|
// The channel should transition to CONNECTING automatically when an RPC
|
|
// is made.
|
|
testutils.AwaitState(ctx, t, cc, connectivity.Connecting)
|
|
|
|
// The channel remains in CONNECTING state for a short time.
|
|
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
|
|
defer shortCancel()
|
|
testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Connecting)
|
|
}
|