redesign the API

This commit is contained in:
iamqizhao 2015-09-25 13:21:25 -07:00
parent 2aa9899560
commit ec99a32572
4 changed files with 78 additions and 45 deletions

30
call.go
View File

@ -116,10 +116,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
o.after(&c)
}
}()
conn, err := cc.picker.Pick()
if err != nil {
return toRPCErr(err)
}
//conn, err := cc.picker.Pick()
//if err != nil {
// return toRPCErr(err)
//}
if EnableTracing {
c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
defer c.traceInfo.tr.Finish()
@ -136,10 +136,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
}()
}
callHdr := &transport.CallHdr{
Host: conn.authority,
Method: method,
}
//callHdr := &transport.CallHdr{
// Host: conn.authority,
// Method: method,
//}
topts := &transport.Options{
Last: true,
Delay: false,
@ -152,13 +152,25 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
err error
t transport.ClientTransport
stream *transport.Stream
conn *Conn
)
// TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs.
if lastErr != nil && c.failFast {
return toRPCErr(lastErr)
}
t, err = conn.wait(ctx)
conn, err = cc.picker.Pick()
if err != nil {
return toRPCErr(err)
}
callHdr := &transport.CallHdr{
Host: conn.authority,
Method: method,
}
t, err = conn.Wait(ctx)
if err != nil {
if err == ErrTransientFailure {
continue
}
if lastErr != nil {
// This was a retry; return the error from the last attempt.
return toRPCErr(lastErr)

View File

@ -65,6 +65,8 @@ var (
// ErrClientConnTimeout indicates that the connection could not be
// established or re-established within the specified timeout.
ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
// ErrTransientFailure indicates the connection failed due to a transient error.
ErrTransientFailure = errors.New("transient connection failure")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@ -148,7 +150,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
opt(&dopts)
}
if dopts.picker == nil {
p, err := newSimplePicker(target, dopts)
p, err := newUnicastPicker(target, dopts)
if err != nil {
return nil, err
}
@ -198,11 +200,7 @@ type ClientConn struct {
// State returns the connectivity state of the Conn used for next upcoming RPC.
// This is EXPERIMENTAL API.
func (cc *ClientConn) State() ConnectivityState {
c := cc.picker.Peek()
if c == nil {
return Idle
}
return c.getState()
return cc.picker.State()
}
// WaitForStateChange blocks until the state changes to something other than the sourceState
@ -210,11 +208,7 @@ func (cc *ClientConn) State() ConnectivityState {
// or timeout fires, and true otherwise.
// This is EXPERIEMENTAL API.
func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
c := cc.picker.Peek()
if c == nil {
return false
}
return c.waitForStateChange(timeout, sourceState)
return cc.picker.WaitForStateChange(timeout, sourceState)
}
// Close starts to tear down the ClientConn.
@ -317,16 +311,17 @@ func (cc *Conn) errorf(format string, a ...interface{}) {
}
}
// getState returns the connectivity state of the Conn
func (cc *Conn) getState() ConnectivityState {
// State returns the connectivity state of the Conn
func (cc *Conn) State() ConnectivityState {
cc.mu.Lock()
defer cc.mu.Unlock()
return cc.state
}
// waitForStateChange blocks until the state changes to something other than the sourceState
// WaitForStateChange blocks until the state changes to something other than the sourceState
// or timeout fires. It returns false if timeout fires and true otherwise.
func (cc *Conn) waitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
// TODO(zhaoq): Rewrite for complex Picker.
func (cc *Conn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
start := time.Now()
cc.mu.Lock()
defer cc.mu.Unlock()
@ -402,6 +397,10 @@ func (cc *Conn) resetTransport(closeTransport bool) error {
cc.errorf("transient failure: %v", err)
cc.state = TransientFailure
cc.stateCV.Broadcast()
if cc.ready != nil {
close(cc.ready)
cc.ready = nil
}
cc.mu.Unlock()
sleepTime -= time.Since(connectTime)
if sleepTime < 0 {
@ -468,9 +467,8 @@ func (cc *Conn) transportMonitor() {
}
}
// When wait returns, either the new transport is up or ClientConn is
// closing.
func (cc *Conn) wait(ctx context.Context) (transport.ClientTransport, error) {
// Wait blocks until i) the new transport is up or ii) ctx is done or iii)
func (cc *Conn) Wait(ctx context.Context) (transport.ClientTransport, error) {
for {
cc.mu.Lock()
switch {
@ -480,6 +478,11 @@ func (cc *Conn) wait(ctx context.Context) (transport.ClientTransport, error) {
case cc.state == Ready:
cc.mu.Unlock()
return cc.transport, nil
case cc.state == TransientFailure:
cc.mu.Unlock()
// Break out so that the caller gets chance to pick another transport to
// perform rpc instead of sticking to this transport.
return nil, ErrTransientFailure
default:
ready := cc.ready
if ready == nil {

View File

@ -1,43 +1,50 @@
package grpc
import (
"time"
)
// Picker picks a Conn for RPC requests.
// This is EXPERIMENTAL and Please do not implement your own Picker for now.
type Picker interface {
// Pick returns the Conn to use for the upcoming RPC. It may return different
// Conn's up to the implementation.
Pick() (*Conn, error)
// Peek returns the Conn use use for the next upcoming RPC. It returns the same
// Conn until next time Pick gets invoked.
Peek() *Conn
State() ConnectivityState
WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool
// Close closes all the Conn's owned by this Picker.
Close() error
}
func newSimplePicker(target string, dopts dialOptions) (Picker, error) {
func newUnicastPicker(target string, dopts dialOptions) (Picker, error) {
c, err := NewConn(target, dopts)
if err != nil {
return nil, err
}
return &simplePicker{
return &unicastPicker{
conn: c,
}, nil
}
// simplePicker is default Picker which is used when there is no custom Picker
// unicastPicker is the default Picker which is used when there is no custom Picker
// specified by users. It always picks the same Conn.
type simplePicker struct {
type unicastPicker struct {
conn *Conn
}
func (p *simplePicker) Pick() (*Conn, error) {
func (p *unicastPicker) Pick() (*Conn, error) {
return p.conn, nil
}
func (p *simplePicker) Peek() *Conn {
return p.conn
func (p *unicastPicker) State() ConnectivityState {
return p.conn.State()
}
func (p *simplePicker) Close() error {
func (p *unicastPicker) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
return p.conn.WaitForStateChange(timeout, sourceState)
}
func (p *unicastPicker) Close() error {
if p.conn != nil {
return p.conn.Close()
}

View File

@ -96,9 +96,24 @@ type ClientStream interface {
// NewClientStream creates a new Stream for the client side. This is called
// by generated code.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
conn, err := cc.picker.Pick()
if err != nil {
return nil, toRPCErr(err)
var (
conn *Conn
t transport.ClientTransport
err error
)
for {
conn, err = cc.picker.Pick()
if err != nil {
return nil, toRPCErr(err)
}
t, err = conn.Wait(ctx)
if err != nil {
if err == ErrTransientFailure {
continue
}
return nil, toRPCErr(err)
}
break
}
// TODO(zhaoq): CallOption is omitted. Add support when it is needed.
callHdr := &transport.CallHdr{
@ -118,10 +133,6 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false)
}
t, err := conn.wait(ctx)
if err != nil {
return nil, toRPCErr(err)
}
s, err := t.NewStream(ctx, callHdr)
if err != nil {
return nil, toRPCErr(err)