balancer: make producer RPCs block until the SubConn is READY (#6236)

This commit is contained in:
Doug Fawley 2023-05-02 10:09:23 -07:00 committed by GitHub
parent b153b006ce
commit ed3ceba605
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 187 additions and 17 deletions

View File

@ -25,14 +25,12 @@ import (
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)
// ccBalancerWrapper sits between the ClientConn and the Balancer.
@ -405,14 +403,13 @@ func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
return acbw.ac
}
var errSubConnNotReady = status.Error(codes.Unavailable, "SubConn not currently connected")
// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
// ready, returns errSubConnNotReady.
// ready, blocks until it is or ctx expires. Returns an error when the context
// expires or the addrConn is shut down.
func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
transport := acbw.ac.getReadyTransport()
if transport == nil {
return nil, errSubConnNotReady
transport, err := acbw.ac.getTransport(ctx)
if err != nil {
return nil, err
}
return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
}

View File

@ -742,6 +742,7 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
dopts: cc.dopts,
czData: new(channelzData),
resetBackoff: make(chan struct{}),
stateChan: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called.
@ -1122,7 +1123,8 @@ type addrConn struct {
addrs []resolver.Address // All addresses that the resolver resolved to.
// Use updateConnectivityState for updating addrConn's connectivity state.
state connectivity.State
state connectivity.State
stateChan chan struct{} // closed and recreated on every state change.
backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
@ -1136,6 +1138,9 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
if ac.state == s {
return
}
// When changing states, reset the state change channel.
close(ac.stateChan)
ac.stateChan = make(chan struct{})
ac.state = s
if lastErr == nil {
channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
@ -1438,6 +1443,29 @@ func (ac *addrConn) getReadyTransport() transport.ClientTransport {
return nil
}
// getTransport waits until the addrconn is ready and returns the transport.
// If the context expires first, returns an appropriate status. If the
// addrConn is stopped first, returns an Unavailable status error.
func (ac *addrConn) getTransport(ctx context.Context) (transport.ClientTransport, error) {
for ctx.Err() == nil {
ac.mu.Lock()
t, state, sc := ac.transport, ac.state, ac.stateChan
ac.mu.Unlock()
if state == connectivity.Ready {
return t, nil
}
if state == connectivity.Shutdown {
return nil, status.Errorf(codes.Unavailable, "SubConn shutting down")
}
select {
case <-ctx.Done():
case <-sc:
}
}
return nil, status.FromContextError(ctx.Err()).Err()
}
// tearDown starts to tear down the addrConn.
//
// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct

View File

@ -79,8 +79,8 @@ func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOpt
p := pr.(*producer)
p.registerListener(l, opts.ReportInterval)
// TODO: When we can register for SubConn state updates, don't call run()
// until READY and automatically call stop() on SHUTDOWN.
// TODO: When we can register for SubConn state updates, automatically call
// stop() on SHUTDOWN.
// If stop is called multiple times, prevent it from having any effect on
// subsequent calls.
@ -175,12 +175,11 @@ func (p *producer) run(ctx context.Context) {
logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.")
return
case status.Code(err) == codes.Unavailable:
// The SubConn is not currently ready; backoff silently.
//
// TODO: don't attempt the stream until the state is READY to
// minimize the chances of this case and to avoid using the
// exponential backoff mechanism, as we should know it's safe to
// retry when the state is READY again.
// TODO: this code should ideally log an error, too, but for now we
// receive this code when shutting down the ClientConn. Once we
// can determine the state or ensure the producer is stopped before
// the stream ends, we can log an error when it's not a natural
// shutdown.
default:
// Log all other errors.
logger.Error("Received unexpected stream error:", err)

View File

@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/stubserver"
@ -1004,3 +1005,148 @@ func (s) TestMetadataInPickResult(t *testing.T) {
t.Fatalf("Mismatch in custom metadata received at test backend, got: %v, want %v", gotMDVal, wantMDVal)
}
}
// producerTestBalancerBuilder and producerTestBalancer start a producer which
// makes an RPC before the subconn is READY, then connects the subconn, and
// pushes the resulting error (expected to be nil) to rpcErrChan.
type producerTestBalancerBuilder struct {
rpcErrChan chan error
ctxChan chan context.Context
}
func (bb *producerTestBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &producerTestBalancer{cc: cc, rpcErrChan: bb.rpcErrChan, ctxChan: bb.ctxChan}
}
const producerTestBalancerName = "producer_test_balancer"
func (bb *producerTestBalancerBuilder) Name() string { return producerTestBalancerName }
type producerTestBalancer struct {
cc balancer.ClientConn
rpcErrChan chan error
ctxChan chan context.Context
}
func (b *producerTestBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
// Create the subconn, but don't connect it.
sc, err := b.cc.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
return fmt.Errorf("error creating subconn: %v", err)
}
// Create the producer. This will call the producer builder's Build
// method, which will try to start an RPC in a goroutine.
p := &testProducerBuilder{start: grpcsync.NewEvent(), rpcErrChan: b.rpcErrChan, ctxChan: b.ctxChan}
sc.GetOrBuildProducer(p)
// Wait here until the producer is about to perform the RPC, which should
// block until connected.
<-p.start.Done()
// Ensure the error chan doesn't get anything on it before we connect the
// subconn.
select {
case err := <-b.rpcErrChan:
go func() { b.rpcErrChan <- fmt.Errorf("Got unexpected data on rpcErrChan: %v", err) }()
default:
}
// Now we can connect, which will unblock the RPC above.
sc.Connect()
// The stub server requires a READY picker to be reported, to unblock its
// Start method. We won't make RPCs in our test, so a nil picker is okay.
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: nil})
return nil
}
func (b *producerTestBalancer) ResolverError(err error) {
panic(fmt.Sprintf("Unexpected resolver error: %v", err))
}
func (b *producerTestBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {}
func (b *producerTestBalancer) Close() {}
type testProducerBuilder struct {
start *grpcsync.Event
rpcErrChan chan error
ctxChan chan context.Context
}
func (b *testProducerBuilder) Build(cci interface{}) (balancer.Producer, func()) {
c := testgrpc.NewTestServiceClient(cci.(grpc.ClientConnInterface))
// Perform the RPC in a goroutine instead of during build because the
// subchannel's mutex is held here.
go func() {
ctx := <-b.ctxChan
b.start.Fire()
_, err := c.EmptyCall(ctx, &testpb.Empty{})
b.rpcErrChan <- err
}()
return nil, func() {}
}
// TestBalancerProducerBlockUntilReady tests that we get no RPC errors from
// producers when subchannels aren't ready.
func (s) TestBalancerProducerBlockUntilReady(t *testing.T) {
// rpcErrChan is given to the LB policy to report the status of the
// producer's one RPC.
ctxChan := make(chan context.Context, 1)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
ctxChan <- ctx
rpcErrChan := make(chan error)
balancer.Register(&producerTestBalancerBuilder{rpcErrChan: rpcErrChan, ctxChan: ctxChan})
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
// Start the server & client with the test producer LB policy.
svcCfg := fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, producerTestBalancerName)
if err := ss.Start(nil, grpc.WithDefaultServiceConfig(svcCfg)); err != nil {
t.Fatalf("Error starting testing server: %v", err)
}
defer ss.Stop()
// Receive the error from the producer's RPC, which should be nil.
if err := <-rpcErrChan; err != nil {
t.Fatalf("Received unexpected error from producer RPC: %v", err)
}
}
// TestBalancerProducerHonorsContext tests that producers that perform RPC get
// context errors correctly.
func (s) TestBalancerProducerHonorsContext(t *testing.T) {
// rpcErrChan is given to the LB policy to report the status of the
// producer's one RPC.
ctxChan := make(chan context.Context, 1)
ctx, cancel := context.WithCancel(context.Background())
ctxChan <- ctx
rpcErrChan := make(chan error)
balancer.Register(&producerTestBalancerBuilder{rpcErrChan: rpcErrChan, ctxChan: ctxChan})
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
// Start the server & client with the test producer LB policy.
svcCfg := fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, producerTestBalancerName)
if err := ss.Start(nil, grpc.WithDefaultServiceConfig(svcCfg)); err != nil {
t.Fatalf("Error starting testing server: %v", err)
}
defer ss.Stop()
cancel()
// Receive the error from the producer's RPC, which should be canceled.
if err := <-rpcErrChan; status.Code(err) != codes.Canceled {
t.Fatalf("RPC error: %v; want status.Code(err)=%v", err, codes.Canceled)
}
}