mirror of https://github.com/grpc/grpc-go.git
idle: decrement active call count for streaming RPCs only when the call completes (#6610)
This commit is contained in:
parent
b0a946cf0c
commit
254bccb3bd
5
call.go
5
call.go
|
@ -27,11 +27,6 @@ import (
|
|||
//
|
||||
// All errors returned by Invoke are compatible with the status package.
|
||||
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
|
||||
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer cc.idlenessMgr.OnCallEnd()
|
||||
|
||||
// allow interceptor to see all applicable call options, which means those
|
||||
// configured as defaults from dial option as well as per-call options
|
||||
opts = combine(cc.dopts.callOptions, opts)
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -179,80 +180,113 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
|||
// Tests the case where channel idleness is enabled by passing a small value for
|
||||
// idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY.
|
||||
func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
|
||||
// Create a ClientConn with a short idle_timeout.
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
dopts := []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithResolvers(r),
|
||||
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
|
||||
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
|
||||
}
|
||||
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
|
||||
if err != nil {
|
||||
t.Fatalf("grpc.Dial() failed: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { cc.Close() })
|
||||
|
||||
// Start a test backend which keeps a unary RPC call active by blocking on a
|
||||
// channel that is closed by the test later on. Also push an address update
|
||||
// via the resolver.
|
||||
blockCh := make(chan struct{})
|
||||
backend := &stubserver.StubServer{
|
||||
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
||||
<-blockCh
|
||||
return &testpb.Empty{}, nil
|
||||
tests := []struct {
|
||||
name string
|
||||
makeRPC func(ctx context.Context, client testgrpc.TestServiceClient) error
|
||||
}{
|
||||
{
|
||||
name: "unary",
|
||||
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
|
||||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
||||
return fmt.Errorf("EmptyCall RPC failed: %v", err)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "streaming",
|
||||
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
|
||||
stream, err := client.FullDuplexCall(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("FullDuplexCall RPC failed: %v", err)
|
||||
}
|
||||
if _, err := stream.Recv(); err != nil && err != io.EOF {
|
||||
t.Fatalf("stream.Recv() failed: %v", err)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := backend.StartServer(); err != nil {
|
||||
t.Fatalf("Failed to start backend: %v", err)
|
||||
}
|
||||
t.Cleanup(backend.Stop)
|
||||
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
|
||||
|
||||
// Verify that the ClientConn moves to READY.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
// Create a ClientConn with a short idle_timeout.
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
dopts := []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithResolvers(r),
|
||||
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
|
||||
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
|
||||
}
|
||||
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
|
||||
if err != nil {
|
||||
t.Fatalf("grpc.Dial() failed: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { cc.Close() })
|
||||
|
||||
// Spawn a goroutine which checks expected state transitions and idleness
|
||||
// channelz trace events. It eventually closes `blockCh`, thereby unblocking
|
||||
// the server RPC handler and the unary call below.
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(blockCh)
|
||||
// Verify that the ClientConn stays in READY.
|
||||
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
|
||||
defer sCancel()
|
||||
testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready)
|
||||
// Start a test backend which keeps a unary RPC call active by blocking on a
|
||||
// channel that is closed by the test later on. Also push an address update
|
||||
// via the resolver.
|
||||
blockCh := make(chan struct{})
|
||||
backend := &stubserver.StubServer{
|
||||
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
||||
<-blockCh
|
||||
return &testpb.Empty{}, nil
|
||||
},
|
||||
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
|
||||
<-blockCh
|
||||
return nil
|
||||
},
|
||||
}
|
||||
if err := backend.StartServer(); err != nil {
|
||||
t.Fatalf("Failed to start backend: %v", err)
|
||||
}
|
||||
t.Cleanup(backend.Stop)
|
||||
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
|
||||
|
||||
// Verify that there are no idleness related channelz events.
|
||||
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
// Verify that the ClientConn moves to READY.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
|
||||
|
||||
// Unblock the unary RPC on the server.
|
||||
errCh <- nil
|
||||
}()
|
||||
// Spawn a goroutine which checks expected state transitions and idleness
|
||||
// channelz trace events.
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(blockCh)
|
||||
|
||||
// Make a unary RPC that blocks on the server, thereby ensuring that the
|
||||
// count of active RPCs on the client is non-zero.
|
||||
client := testgrpc.NewTestServiceClient(cc)
|
||||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
||||
t.Errorf("EmptyCall RPC failed: %v", err)
|
||||
}
|
||||
// Verify that the ClientConn stays in READY.
|
||||
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
|
||||
defer sCancel()
|
||||
if cc.WaitForStateChange(sCtx, connectivity.Ready) {
|
||||
errCh <- fmt.Errorf("state changed from %q to %q when no state change was expected", connectivity.Ready, cc.GetState())
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
|
||||
// Verify that there are no idleness related channelz events.
|
||||
//
|
||||
// TODO: Improve the checks here. If these log strings are
|
||||
// changed in the code, these checks will continue to pass.
|
||||
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
errCh <- channelzTraceEventNotFound(ctx, "exiting idle mode")
|
||||
}()
|
||||
|
||||
if err := test.makeRPC(ctx, testgrpc.NewTestServiceClient(cc)); err != nil {
|
||||
t.Fatalf("%s rpc failed: %v", test.name, err)
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
15
stream.go
15
stream.go
|
@ -158,11 +158,6 @@ type ClientStream interface {
|
|||
// If none of the above happen, a goroutine and a context will be leaked, and grpc
|
||||
// will not call the optionally-configured stats handler with a stats.End message.
|
||||
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
|
||||
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cc.idlenessMgr.OnCallEnd()
|
||||
|
||||
// allow interceptor to see all applicable call options, which means those
|
||||
// configured as defaults from dial option as well as per-call options
|
||||
opts = combine(cc.dopts.callOptions, opts)
|
||||
|
@ -179,6 +174,16 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||
}
|
||||
|
||||
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
|
||||
// Start tracking the RPC for idleness purposes. This is where a stream is
|
||||
// created for both streaming and unary RPCs, and hence is a good place to
|
||||
// track active RPC count.
|
||||
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Add a calloption, to decrement the active call count, that gets executed
|
||||
// when the RPC completes.
|
||||
opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
|
||||
|
||||
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
|
||||
// validate md
|
||||
if err := imetadata.Validate(md); err != nil {
|
||||
|
|
Loading…
Reference in New Issue