Compare commits

...

7 Commits

Author SHA1 Message Date
Easwar Swaminathan 62726d4336
update version to 1.58.1 (#6629) 2023-09-13 14:31:22 -07:00
Easwar Swaminathan fa6d9abecb
cherry-pick 6610 and 6620 (#6627) 2023-09-13 13:37:33 -07:00
Doug Fawley 467fbf2a55
Change version to 1.58.1-dev (#6580)
Co-authored-by: Arvind Bright <arvind.bright100@gmail.com>
2023-09-06 15:12:36 -07:00
Doug Fawley c2b0797a53
Change version to 1.58.0 (#6579) 2023-09-06 10:16:07 -07:00
Arvind Bright 0467e471c2
balancer/leastrequest: Cache atomic load and also add concurrent rpc test (#6607)
Co-authored-by: Zach Reyes <39203661+zasweq@users.noreply.github.com>
2023-09-05 15:47:23 -07:00
Arvind Bright 5d1c0ae143
leastrequest: fix data race in leastrequest picker (#6606)
Co-authored-by: Huang Chong <hchtgh315@gmail.com>
2023-09-05 15:02:03 -07:00
Doug Fawley e26457d0c9
stream: swallow Header errors as we used to; RecvMsg can still return it (#6591) 2023-08-28 13:26:31 -07:00
8 changed files with 290 additions and 104 deletions

View File

@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"testing"
"time"
@ -455,3 +456,57 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) {
t.Fatalf("addr count (-got:, +want): %v", diff)
}
}
// TestConcurrentRPCs tests concurrent RPCs on the least request balancer. It
// configures a channel with a least request balancer as the top level balancer,
// and makes 100 RPCs asynchronously. This makes sure no race conditions happen
// in this scenario.
func (s) TestConcurrentRPCs(t *testing.T) {
addresses := setupBackends(t)
mr := manual.NewBuilderWithScheme("lr-e2e")
defer mr.Close()
// Configure least request as top level balancer of channel.
lrscJSON := `
{
"loadBalancingConfig": [
{
"least_request_experimental": {
"choiceCount": 2
}
}
]
}`
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
firstTwoAddresses := []resolver.Address{
{Addr: addresses[0]},
{Addr: addresses[1]},
}
mr.InitialState(resolver.State{
Addresses: firstTwoAddresses,
ServiceConfig: sc,
})
cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testServiceClient := testgrpc.NewTestServiceClient(cc)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 5; j++ {
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
}
}()
}
wg.Wait()
}

View File

@ -80,7 +80,7 @@ func (bb) Name() string {
}
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*int32)}
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*atomic.Int32)}
baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
b.Balancer = baseBuilder.Build(cc, bOpts)
return b
@ -92,7 +92,7 @@ type leastRequestBalancer struct {
balancer.Balancer
choiceCount uint32
scRPCCounts map[balancer.SubConn]*int32 // Hold onto RPC counts to keep track for subsequent picker updates.
scRPCCounts map[balancer.SubConn]*atomic.Int32 // Hold onto RPC counts to keep track for subsequent picker updates.
}
func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
@ -108,7 +108,7 @@ func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnStat
type scWithRPCCount struct {
sc balancer.SubConn
numRPCs *int32
numRPCs *atomic.Int32
}
func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
@ -126,7 +126,7 @@ func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picke
// Create new refs if needed.
for sc := range info.ReadySCs {
if _, ok := lrb.scRPCCounts[sc]; !ok {
lrb.scRPCCounts[sc] = new(int32)
lrb.scRPCCounts[sc] = new(atomic.Int32)
}
}
@ -155,25 +155,24 @@ type picker struct {
func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
var pickedSC *scWithRPCCount
var pickedSCNumRPCs int32
for i := 0; i < int(p.choiceCount); i++ {
index := grpcranduint32() % uint32(len(p.subConns))
sc := p.subConns[index]
if pickedSC == nil {
pickedSC = &sc
continue
}
if *sc.numRPCs < *pickedSC.numRPCs {
n := sc.numRPCs.Load()
if pickedSC == nil || n < pickedSCNumRPCs {
pickedSC = &sc
pickedSCNumRPCs = n
}
}
// "The counter for a subchannel should be atomically incremented by one
// after it has been successfully picked by the picker." - A48
atomic.AddInt32(pickedSC.numRPCs, 1)
pickedSC.numRPCs.Add(1)
// "the picker should add a callback for atomically decrementing the
// subchannel counter once the RPC finishes (regardless of Status code)." -
// A48.
done := func(balancer.DoneInfo) {
atomic.AddInt32(pickedSC.numRPCs, -1)
pickedSC.numRPCs.Add(-1)
}
return balancer.PickResult{
SubConn: pickedSC.sc,

View File

@ -27,11 +27,6 @@ import (
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return err
}
defer cc.idlenessMgr.OnCallEnd()
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)

View File

@ -1091,8 +1091,8 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
ac.cancel()
ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
// We have to defer here because GracefulClose => Close => onClose, which
// requires locking ac.mu.
// We have to defer here because GracefulClose => onClose, which requires
// locking ac.mu.
if ac.transport != nil {
defer ac.transport.GracefulClose()
ac.transport = nil
@ -1680,16 +1680,7 @@ func (ac *addrConn) tearDown(err error) {
ac.updateConnectivityState(connectivity.Shutdown, nil)
ac.cancel()
ac.curAddr = resolver.Address{}
if err == errConnDrain && curTr != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
ac.mu.Unlock()
curTr.GracefulClose()
ac.mu.Lock()
}
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel deleted",
Severity: channelz.CtInfo,
@ -1703,6 +1694,29 @@ func (ac *addrConn) tearDown(err error) {
// being deleted right away.
channelz.RemoveEntry(ac.channelzID)
ac.mu.Unlock()
// We have to release the lock before the call to GracefulClose/Close here
// because both of them call onClose(), which requires locking ac.mu.
if curTr != nil {
if err == errConnDrain {
// Close the transport gracefully when the subConn is being shutdown.
//
// GracefulClose() may be executed multiple times if:
// - multiple GoAway frames are received from the server
// - there are concurrent name resolver or balancer triggered
// address removal and GoAway
curTr.GracefulClose()
} else {
// Hard close the transport when the channel is entering idle or is
// being shutdown. In the case where the channel is being shutdown,
// closing of transports is also taken care of by cancelation of cc.ctx.
// But in the case where the channel is entering idle, we need to
// explicitly close the transports here. Instead of distinguishing
// between these two cases, it is simpler to close the transport
// unconditionally here.
curTr.Close(err)
}
}
}
func (ac *addrConn) getState() connectivity.State {

View File

@ -22,6 +22,7 @@ import (
"context"
"errors"
"fmt"
"io"
"strings"
"testing"
"time"
@ -141,7 +142,8 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
}
// Tests the case where channel idleness is enabled by passing a small value for
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE.
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE, and
// the connection to the backend is closed.
func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
@ -158,7 +160,8 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
t.Cleanup(func() { cc.Close() })
// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
lis := testutils.NewListenerWrapper(t, nil)
backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis})
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
@ -167,6 +170,13 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
// Retrieve the wrapped conn from the listener.
v, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Failed to retrieve conn from test listener: %v", err)
}
conn := v.(*testutils.ConnWrapper)
// Verify that the ClientConn moves to IDLE as there is no activity.
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
@ -174,85 +184,123 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil {
t.Fatal(err)
}
// Verify that the previously open connection is closed.
if _, err := conn.CloseCh.Receive(ctx); err != nil {
t.Fatalf("Failed when waiting for connection to be closed after channel entered IDLE: %v", err)
}
}
// Tests the case where channel idleness is enabled by passing a small value for
// idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY.
func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
// Start a test backend which keeps a unary RPC call active by blocking on a
// channel that is closed by the test later on. Also push an address update
// via the resolver.
blockCh := make(chan struct{})
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
<-blockCh
return &testpb.Empty{}, nil
tests := []struct {
name string
makeRPC func(ctx context.Context, client testgrpc.TestServiceClient) error
}{
{
name: "unary",
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
return fmt.Errorf("EmptyCall RPC failed: %v", err)
}
return nil
},
},
{
name: "streaming",
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
stream, err := client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("FullDuplexCall RPC failed: %v", err)
}
if _, err := stream.Recv(); err != nil && err != io.EOF {
t.Fatalf("stream.Recv() failed: %v", err)
}
return nil
},
},
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
// Spawn a goroutine which checks expected state transitions and idleness
// channelz trace events. It eventually closes `blockCh`, thereby unblocking
// the server RPC handler and the unary call below.
errCh := make(chan error, 1)
go func() {
defer close(blockCh)
// Verify that the ClientConn stays in READY.
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
defer sCancel()
testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready)
// Start a test backend which keeps a unary RPC call active by blocking on a
// channel that is closed by the test later on. Also push an address update
// via the resolver.
blockCh := make(chan struct{})
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
<-blockCh
return &testpb.Empty{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
<-blockCh
return nil
},
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Verify that there are no idleness related channelz events.
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
errCh <- err
return
}
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
errCh <- err
return
}
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
// Unblock the unary RPC on the server.
errCh <- nil
}()
// Spawn a goroutine which checks expected state transitions and idleness
// channelz trace events.
errCh := make(chan error, 1)
go func() {
defer close(blockCh)
// Make a unary RPC that blocks on the server, thereby ensuring that the
// count of active RPCs on the client is non-zero.
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Errorf("EmptyCall RPC failed: %v", err)
}
// Verify that the ClientConn stays in READY.
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
defer sCancel()
if cc.WaitForStateChange(sCtx, connectivity.Ready) {
errCh <- fmt.Errorf("state changed from %q to %q when no state change was expected", connectivity.Ready, cc.GetState())
return
}
select {
case err := <-errCh:
if err != nil {
t.Fatal(err)
}
case <-ctx.Done():
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
// Verify that there are no idleness related channelz events.
//
// TODO: Improve the checks here. If these log strings are
// changed in the code, these checks will continue to pass.
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
errCh <- err
return
}
errCh <- channelzTraceEventNotFound(ctx, "exiting idle mode")
}()
if err := test.makeRPC(ctx, testgrpc.NewTestServiceClient(cc)); err != nil {
t.Fatalf("%s rpc failed: %v", test.name, err)
}
select {
case err := <-errCh:
if err != nil {
t.Fatal(err)
}
case <-ctx.Done():
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
}
})
}
}

View File

@ -91,7 +91,9 @@ type Stream interface {
// status package.
type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
// is any. It blocks if the metadata is not ready to read. If the metadata
// is nil and the error is also nil, then the stream was terminated without
// headers, and the status can be discovered by calling RecvMsg.
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
@ -156,11 +158,6 @@ type ClientStream interface {
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return nil, err
}
defer cc.idlenessMgr.OnCallEnd()
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
@ -177,6 +174,16 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
// Start tracking the RPC for idleness purposes. This is where a stream is
// created for both streaming and unary RPCs, and hence is a good place to
// track active RPC count.
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return nil, err
}
// Add a calloption, to decrement the active call count, that gets executed
// when the RPC completes.
opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
// validate md
if err := imetadata.Validate(md); err != nil {
@ -802,7 +809,8 @@ func (cs *clientStream) Header() (metadata.MD, error) {
if err != nil {
cs.finish(err)
return nil, err
// Do not return the error. The user should get it by calling Recv().
return nil, nil
}
if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil {

67
stream_test.go Normal file
View File

@ -0,0 +1,67 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc_test
import (
"context"
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/status"
)
const defaultTestTimeout = 10 * time.Second
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func (s) TestStream_Header_TrailersOnly(t *testing.T) {
ss := stubserver.StubServer{
FullDuplexCallF: func(stream grpc_testing.TestService_FullDuplexCallServer) error {
return status.Errorf(codes.NotFound, "a test error")
},
}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
s, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatal("Error staring call", err)
}
if md, err := s.Header(); md != nil || err != nil {
t.Fatalf("s.Header() = %v, %v; want nil, nil", md, err)
}
if _, err := s.Recv(); status.Code(err) != codes.NotFound {
t.Fatalf("s.Recv() = _, %v; want _, err.Code()=codes.NotFound", err)
}
}

View File

@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.58.0-dev"
const Version = "1.58.1"