mirror of https://github.com/grpc/grpc-go.git
cherry-pick 6610 and 6620 (#6627)
This commit is contained in:
parent
467fbf2a55
commit
fa6d9abecb
5
call.go
5
call.go
|
@ -27,11 +27,6 @@ import (
|
||||||
//
|
//
|
||||||
// All errors returned by Invoke are compatible with the status package.
|
// All errors returned by Invoke are compatible with the status package.
|
||||||
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
|
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
|
||||||
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer cc.idlenessMgr.OnCallEnd()
|
|
||||||
|
|
||||||
// allow interceptor to see all applicable call options, which means those
|
// allow interceptor to see all applicable call options, which means those
|
||||||
// configured as defaults from dial option as well as per-call options
|
// configured as defaults from dial option as well as per-call options
|
||||||
opts = combine(cc.dopts.callOptions, opts)
|
opts = combine(cc.dopts.callOptions, opts)
|
||||||
|
|
|
@ -1091,8 +1091,8 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
|
||||||
ac.cancel()
|
ac.cancel()
|
||||||
ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
|
ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
|
||||||
|
|
||||||
// We have to defer here because GracefulClose => Close => onClose, which
|
// We have to defer here because GracefulClose => onClose, which requires
|
||||||
// requires locking ac.mu.
|
// locking ac.mu.
|
||||||
if ac.transport != nil {
|
if ac.transport != nil {
|
||||||
defer ac.transport.GracefulClose()
|
defer ac.transport.GracefulClose()
|
||||||
ac.transport = nil
|
ac.transport = nil
|
||||||
|
@ -1680,16 +1680,7 @@ func (ac *addrConn) tearDown(err error) {
|
||||||
ac.updateConnectivityState(connectivity.Shutdown, nil)
|
ac.updateConnectivityState(connectivity.Shutdown, nil)
|
||||||
ac.cancel()
|
ac.cancel()
|
||||||
ac.curAddr = resolver.Address{}
|
ac.curAddr = resolver.Address{}
|
||||||
if err == errConnDrain && curTr != nil {
|
|
||||||
// GracefulClose(...) may be executed multiple times when
|
|
||||||
// i) receiving multiple GoAway frames from the server; or
|
|
||||||
// ii) there are concurrent name resolver/Balancer triggered
|
|
||||||
// address removal and GoAway.
|
|
||||||
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
|
|
||||||
ac.mu.Unlock()
|
|
||||||
curTr.GracefulClose()
|
|
||||||
ac.mu.Lock()
|
|
||||||
}
|
|
||||||
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
|
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
|
||||||
Desc: "Subchannel deleted",
|
Desc: "Subchannel deleted",
|
||||||
Severity: channelz.CtInfo,
|
Severity: channelz.CtInfo,
|
||||||
|
@ -1703,6 +1694,29 @@ func (ac *addrConn) tearDown(err error) {
|
||||||
// being deleted right away.
|
// being deleted right away.
|
||||||
channelz.RemoveEntry(ac.channelzID)
|
channelz.RemoveEntry(ac.channelzID)
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
|
||||||
|
// We have to release the lock before the call to GracefulClose/Close here
|
||||||
|
// because both of them call onClose(), which requires locking ac.mu.
|
||||||
|
if curTr != nil {
|
||||||
|
if err == errConnDrain {
|
||||||
|
// Close the transport gracefully when the subConn is being shutdown.
|
||||||
|
//
|
||||||
|
// GracefulClose() may be executed multiple times if:
|
||||||
|
// - multiple GoAway frames are received from the server
|
||||||
|
// - there are concurrent name resolver or balancer triggered
|
||||||
|
// address removal and GoAway
|
||||||
|
curTr.GracefulClose()
|
||||||
|
} else {
|
||||||
|
// Hard close the transport when the channel is entering idle or is
|
||||||
|
// being shutdown. In the case where the channel is being shutdown,
|
||||||
|
// closing of transports is also taken care of by cancelation of cc.ctx.
|
||||||
|
// But in the case where the channel is entering idle, we need to
|
||||||
|
// explicitly close the transports here. Instead of distinguishing
|
||||||
|
// between these two cases, it is simpler to close the transport
|
||||||
|
// unconditionally here.
|
||||||
|
curTr.Close(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ac *addrConn) getState() connectivity.State {
|
func (ac *addrConn) getState() connectivity.State {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -141,7 +142,8 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests the case where channel idleness is enabled by passing a small value for
|
// Tests the case where channel idleness is enabled by passing a small value for
|
||||||
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE.
|
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE, and
|
||||||
|
// the connection to the backend is closed.
|
||||||
func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
||||||
// Create a ClientConn with a short idle_timeout.
|
// Create a ClientConn with a short idle_timeout.
|
||||||
r := manual.NewBuilderWithScheme("whatever")
|
r := manual.NewBuilderWithScheme("whatever")
|
||||||
|
@ -158,7 +160,8 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
||||||
t.Cleanup(func() { cc.Close() })
|
t.Cleanup(func() { cc.Close() })
|
||||||
|
|
||||||
// Start a test backend and push an address update via the resolver.
|
// Start a test backend and push an address update via the resolver.
|
||||||
backend := stubserver.StartTestService(t, nil)
|
lis := testutils.NewListenerWrapper(t, nil)
|
||||||
|
backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis})
|
||||||
t.Cleanup(backend.Stop)
|
t.Cleanup(backend.Stop)
|
||||||
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
|
||||||
|
|
||||||
|
@ -167,6 +170,13 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
|
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
|
||||||
|
|
||||||
|
// Retrieve the wrapped conn from the listener.
|
||||||
|
v, err := lis.NewConnCh.Receive(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to retrieve conn from test listener: %v", err)
|
||||||
|
}
|
||||||
|
conn := v.(*testutils.ConnWrapper)
|
||||||
|
|
||||||
// Verify that the ClientConn moves to IDLE as there is no activity.
|
// Verify that the ClientConn moves to IDLE as there is no activity.
|
||||||
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
|
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
|
||||||
|
|
||||||
|
@ -174,11 +184,46 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
||||||
if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil {
|
if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify that the previously open connection is closed.
|
||||||
|
if _, err := conn.CloseCh.Receive(ctx); err != nil {
|
||||||
|
t.Fatalf("Failed when waiting for connection to be closed after channel entered IDLE: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests the case where channel idleness is enabled by passing a small value for
|
// Tests the case where channel idleness is enabled by passing a small value for
|
||||||
// idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY.
|
// idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY.
|
||||||
func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
|
func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
makeRPC func(ctx context.Context, client testgrpc.TestServiceClient) error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "unary",
|
||||||
|
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
|
||||||
|
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
||||||
|
return fmt.Errorf("EmptyCall RPC failed: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "streaming",
|
||||||
|
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
|
||||||
|
stream, err := client.FullDuplexCall(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("FullDuplexCall RPC failed: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := stream.Recv(); err != nil && err != io.EOF {
|
||||||
|
t.Fatalf("stream.Recv() failed: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
// Create a ClientConn with a short idle_timeout.
|
// Create a ClientConn with a short idle_timeout.
|
||||||
r := manual.NewBuilderWithScheme("whatever")
|
r := manual.NewBuilderWithScheme("whatever")
|
||||||
dopts := []grpc.DialOption{
|
dopts := []grpc.DialOption{
|
||||||
|
@ -202,6 +247,10 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
|
||||||
<-blockCh
|
<-blockCh
|
||||||
return &testpb.Empty{}, nil
|
return &testpb.Empty{}, nil
|
||||||
},
|
},
|
||||||
|
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
|
||||||
|
<-blockCh
|
||||||
|
return nil
|
||||||
|
},
|
||||||
}
|
}
|
||||||
if err := backend.StartServer(); err != nil {
|
if err := backend.StartServer(); err != nil {
|
||||||
t.Fatalf("Failed to start backend: %v", err)
|
t.Fatalf("Failed to start backend: %v", err)
|
||||||
|
@ -215,35 +264,32 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
|
||||||
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
|
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
|
||||||
|
|
||||||
// Spawn a goroutine which checks expected state transitions and idleness
|
// Spawn a goroutine which checks expected state transitions and idleness
|
||||||
// channelz trace events. It eventually closes `blockCh`, thereby unblocking
|
// channelz trace events.
|
||||||
// the server RPC handler and the unary call below.
|
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(blockCh)
|
defer close(blockCh)
|
||||||
|
|
||||||
// Verify that the ClientConn stays in READY.
|
// Verify that the ClientConn stays in READY.
|
||||||
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
|
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
|
||||||
defer sCancel()
|
defer sCancel()
|
||||||
testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready)
|
if cc.WaitForStateChange(sCtx, connectivity.Ready) {
|
||||||
|
errCh <- fmt.Errorf("state changed from %q to %q when no state change was expected", connectivity.Ready, cc.GetState())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Verify that there are no idleness related channelz events.
|
// Verify that there are no idleness related channelz events.
|
||||||
|
//
|
||||||
|
// TODO: Improve the checks here. If these log strings are
|
||||||
|
// changed in the code, these checks will continue to pass.
|
||||||
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
|
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
|
||||||
errCh <- err
|
errCh <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
|
errCh <- channelzTraceEventNotFound(ctx, "exiting idle mode")
|
||||||
errCh <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unblock the unary RPC on the server.
|
|
||||||
errCh <- nil
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Make a unary RPC that blocks on the server, thereby ensuring that the
|
if err := test.makeRPC(ctx, testgrpc.NewTestServiceClient(cc)); err != nil {
|
||||||
// count of active RPCs on the client is non-zero.
|
t.Fatalf("%s rpc failed: %v", test.name, err)
|
||||||
client := testgrpc.NewTestServiceClient(cc)
|
|
||||||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
|
||||||
t.Errorf("EmptyCall RPC failed: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
@ -254,6 +300,8 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
|
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests the case where channel idleness is enabled by passing a small value for
|
// Tests the case where channel idleness is enabled by passing a small value for
|
||||||
|
|
15
stream.go
15
stream.go
|
@ -158,11 +158,6 @@ type ClientStream interface {
|
||||||
// If none of the above happen, a goroutine and a context will be leaked, and grpc
|
// If none of the above happen, a goroutine and a context will be leaked, and grpc
|
||||||
// will not call the optionally-configured stats handler with a stats.End message.
|
// will not call the optionally-configured stats handler with a stats.End message.
|
||||||
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
|
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
|
||||||
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer cc.idlenessMgr.OnCallEnd()
|
|
||||||
|
|
||||||
// allow interceptor to see all applicable call options, which means those
|
// allow interceptor to see all applicable call options, which means those
|
||||||
// configured as defaults from dial option as well as per-call options
|
// configured as defaults from dial option as well as per-call options
|
||||||
opts = combine(cc.dopts.callOptions, opts)
|
opts = combine(cc.dopts.callOptions, opts)
|
||||||
|
@ -179,6 +174,16 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
|
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
|
||||||
|
// Start tracking the RPC for idleness purposes. This is where a stream is
|
||||||
|
// created for both streaming and unary RPCs, and hence is a good place to
|
||||||
|
// track active RPC count.
|
||||||
|
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Add a calloption, to decrement the active call count, that gets executed
|
||||||
|
// when the RPC completes.
|
||||||
|
opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
|
||||||
|
|
||||||
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
|
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
|
||||||
// validate md
|
// validate md
|
||||||
if err := imetadata.Validate(md); err != nil {
|
if err := imetadata.Validate(md); err != nil {
|
||||||
|
|
Loading…
Reference in New Issue