Add the initial service config support (#1009)

* Add the initial service config support

* start scWatcher later

* remove timeoutCh

* address the comments

* deal with dial timeout

* defer cancel for the newly created context for correct lifetime management

* fix the defer order

* added other 2 missing cancels
This commit is contained in:
Qi Zhao 2016-12-19 16:31:00 -08:00 committed by Menghan Li
parent 8712952b7d
commit 09aecb094e
6 changed files with 229 additions and 33 deletions

View File

@ -145,6 +145,14 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo
if mc, ok := cc.getMethodConfig(method); ok {
c.failFast = !mc.WaitForReady
if mc.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
defer cancel()
}
}
for _, o := range opts {
if err := o.before(&c); err != nil {
return toRPCErr(err)
@ -211,6 +219,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}

View File

@ -54,6 +54,8 @@ var (
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
// DEPRECATED: Please use context.DeadlineExceeded instead. This error will be
// removed in Q1 2017.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
// errNoTransportSecurity indicates that there is no transport security
@ -93,6 +95,7 @@ type dialOptions struct {
block bool
insecure bool
timeout time.Duration
scChan <-chan ServiceConfig
copts transport.ConnectOptions
}
@ -129,6 +132,13 @@ func WithBalancer(b Balancer) DialOption {
}
}
// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
func WithServiceConfig(c <-chan ServiceConfig) DialOption {
return func(o *dialOptions) {
o.scChan = c
}
}
// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
// when backing off after failed connection attempts.
func WithBackoffMaxDelay(md time.Duration) DialOption {
@ -260,6 +270,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[Address]*addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
opt(&cc.dopts)
}
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
defer cancel()
}
defer func() {
select {
case <-ctx.Done():
@ -272,10 +291,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
for _, opt := range opts {
opt(&cc.dopts)
if cc.dopts.scChan != nil {
// Wait for the initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = sc
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
// Set defaults.
if cc.dopts.codec == nil {
cc.dopts.codec = protoCodec{}
@ -297,6 +323,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
waitC := make(chan error, 1)
go func() {
var addrs []Address
if cc.dopts.balancer == nil && cc.sc.LB != nil {
cc.dopts.balancer = cc.sc.LB
}
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
@ -332,10 +361,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
close(waitC)
}()
var timeoutCh <-chan time.Time
if cc.dopts.timeout > 0 {
timeoutCh = time.After(cc.dopts.timeout)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
@ -343,14 +368,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if err != nil {
return nil, err
}
case <-timeoutCh:
return nil, ErrClientConnTimeout
}
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}
if cc.dopts.scChan != nil {
go cc.scWatcher()
}
return cc, nil
}
@ -397,6 +425,7 @@ type ClientConn struct {
dopts dialOptions
mu sync.RWMutex
sc ServiceConfig
conns map[Address]*addrConn
}
@ -435,6 +464,24 @@ func (cc *ClientConn) lbWatcher() {
}
}
func (cc *ClientConn) scWatcher() {
for {
select {
case sc, ok := <-cc.dopts.scChan:
if !ok {
return
}
cc.mu.Lock()
// TODO: load balance policy runtime change is ignored.
// We may revist this decision in the future.
cc.sc = sc
cc.mu.Unlock()
case <-cc.ctx.Done():
return
}
}
}
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
@ -522,6 +569,14 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
return nil
}
// TODO: Avoid the locking here.
func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) {
cc.mu.RLock()
defer cc.mu.RUnlock()
m, ok = cc.sc.Methods[method]
return
}
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
var (
ac *addrConn

View File

@ -50,8 +50,8 @@ func TestDialTimeout(t *testing.T) {
if err == nil {
conn.Close()
}
if err != ErrClientConnTimeout {
t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout)
if err != context.DeadlineExceeded {
t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded)
}
}
@ -64,8 +64,8 @@ func TestTLSDialTimeout(t *testing.T) {
if err == nil {
conn.Close()
}
if err != ErrClientConnTimeout {
t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout)
if err != context.DeadlineExceeded {
t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded)
}
}

View File

@ -472,6 +472,44 @@ func convertCode(err error) codes.Code {
return codes.Unknown
}
// MethodConfig defines the configuration recommended by the service providers for a
// particular method.
// This is EXPERIMENTAL and subject to change.
type MethodConfig struct {
// WaitForReady indicates whether RPCs sent to this method should wait until
// the connection is ready by default (!failfast). The value specified via the
// gRPC client API will override the value set here.
WaitForReady bool
// Timeout is the default timeout for RPCs sent to this method. The actual
// deadline used will be the minimum of the value specified here and the value
// set by the application via the gRPC client API. If either one is not set,
// then the other will be used. If neither is set, then the RPC has no deadline.
Timeout time.Duration
// MaxReqSize is the maximum allowed payload size for an individual request in a
// stream (client->server) in bytes. The size which is measured is the serialized,
// uncompressed payload in bytes. The actual value used is the minumum of the value
// specified here and the value set by the application via the gRPC client API. If
// either one is not set, then the other will be used. If neither is set, then the
// built-in default is used.
// TODO: support this.
MaxReqSize uint64
// MaxRespSize is the maximum allowed payload size for an individual response in a
// stream (server->client) in bytes.
// TODO: support this.
MaxRespSize uint64
}
// ServiceConfig is provided by the service provider and contains parameters for how
// clients that connect to the service should behave.
// This is EXPERIMENTAL and subject to change.
type ServiceConfig struct {
// LB is the load balancer the service providers recommends. The balancer specified
// via grpc.WithBalancer will override this.
LB Balancer
// Methods contains a map for the methods in this service.
Methods map[string]MethodConfig
}
// SupportPackageIsVersion4 is referenced from generated protocol buffer files
// to assert that that code is compatible with this version of the grpc package.
//

View File

@ -107,11 +107,18 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
var (
t transport.ClientTransport
s *transport.Stream
put func()
t transport.ClientTransport
s *transport.Stream
put func()
cancel context.CancelFunc
)
c := defaultCallInfo
if mc, ok := cc.getMethodConfig(method); ok {
c.failFast = !mc.WaitForReady
if mc.Timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
}
}
for _, o := range opts {
if err := o.before(&c); err != nil {
return nil, toRPCErr(err)
@ -200,12 +207,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
break
}
cs := &clientStream{
opts: opts,
c: c,
desc: desc,
codec: cc.dopts.codec,
cp: cc.dopts.cp,
dc: cc.dopts.dc,
opts: opts,
c: c,
desc: desc,
codec: cc.dopts.codec,
cp: cc.dopts.cp,
dc: cc.dopts.dc,
cancel: cancel,
put: put,
t: t,
@ -249,16 +257,17 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream.
type clientStream struct {
opts []CallOption
c callInfo
t transport.ClientTransport
s *transport.Stream
p *parser
desc *StreamDesc
codec Codec
cp Compressor
cbuf *bytes.Buffer
dc Decompressor
opts []CallOption
c callInfo
t transport.ClientTransport
s *transport.Stream
p *parser
desc *StreamDesc
codec Codec
cp Compressor
cbuf *bytes.Buffer
dc Decompressor
cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created.
@ -449,6 +458,11 @@ func (cs *clientStream) closeTransportStream(err error) {
}
func (cs *clientStream) finish(err error) {
defer func() {
if cs.cancel != nil {
cs.cancel()
}
}()
cs.mu.Lock()
defer cs.mu.Unlock()
for _, o := range cs.opts {

View File

@ -428,6 +428,7 @@ type test struct {
streamClientInt grpc.StreamClientInterceptor
unaryServerInt grpc.UnaryServerInterceptor
streamServerInt grpc.StreamServerInterceptor
sc <-chan grpc.ServiceConfig
// srv and srvAddr are set once startServer is called.
srv *grpc.Server
@ -450,7 +451,9 @@ func (te *test) tearDown() {
te.restoreLogs()
te.restoreLogs = nil
}
te.srv.Stop()
if te.srv != nil {
te.srv.Stop()
}
}
// newTest returns a new test using the provided testing.T and
@ -547,6 +550,10 @@ func (te *test) clientConn() *grpc.ClientConn {
grpc.WithUserAgent(te.userAgent),
}
if te.sc != nil {
opts = append(opts, grpc.WithServiceConfig(te.sc))
}
if te.clientCompression {
opts = append(opts,
grpc.WithCompressor(grpc.NewGZIPCompressor()),
@ -1013,6 +1020,79 @@ func testFailFast(t *testing.T, e env) {
awaitNewConnLogOutput()
}
func TestServiceConfig(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testServiceConfig(t, e)
}
}
func testServiceConfig(t *testing.T, e env) {
te := newTest(t, e)
ch := make(chan grpc.ServiceConfig)
te.sc = ch
te.userAgent = testAppUA
te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
"Failed to dial : context canceled; please retry.",
)
defer te.tearDown()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
mc := grpc.MethodConfig{
WaitForReady: true,
Timeout: time.Millisecond,
}
m := make(map[string]grpc.MethodConfig)
m["/grpc.testing.TestService/EmptyCall"] = mc
m["/grpc.testing.TestService/FullDuplexCall"] = mc
sc := grpc.ServiceConfig{
Methods: m,
}
ch <- sc
}()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
wg.Wait()
// Generate a service config update.
mc := grpc.MethodConfig{
WaitForReady: false,
}
m := make(map[string]grpc.MethodConfig)
m["/grpc.testing.TestService/EmptyCall"] = mc
m["/grpc.testing.TestService/FullDuplexCall"] = mc
sc := grpc.ServiceConfig{
Methods: m,
}
ch <- sc
// Loop until the new update becomes effective.
for {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable {
continue
}
break
}
// The following RPCs are expected to become fail-fast.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
}
if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.Unavailable {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.Unavailable)
}
}
func TestTap(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {