Add gRPC client side metrics (#2151)

Fixes #1880.

Updates google.golang.org/grpc and github.com/jmhodges/clock, both test suites pass. A few of the gRPC interfaces changed so this also fixes those breakages.
This commit is contained in:
Roland Bracewell Shoemaker 2016-09-09 12:17:36 -07:00 committed by Jacob Hoffman-Andrews
parent d75a44baa0
commit e187c92715
37 changed files with 1762 additions and 476 deletions

30
Godeps/Godeps.json generated
View File

@ -128,7 +128,8 @@
},
{
"ImportPath": "github.com/jmhodges/clock",
"Rev": "3c4ebd218625c9364c33db6d39c276d80c3090c6"
"Comment": "v1.0-4-g880ee4c",
"Rev": "880ee4c335489bc78d01e4d0a254ae880734bc15"
},
{
"ImportPath": "github.com/letsencrypt/go-safe-browsing-api",
@ -209,39 +210,48 @@
},
{
"ImportPath": "google.golang.org/grpc",
"Rev": "5c7e717663448ccc1c9ff4d9ce75cdb1cca28121"
"Comment": "v1.0.1-GA-39-g52f6504",
"Rev": "52f6504dc290bd928a8139ba94e3ab32ed9a6273"
},
{
"ImportPath": "google.golang.org/grpc/codes",
"Rev": "5c7e717663448ccc1c9ff4d9ce75cdb1cca28121"
"Comment": "v1.0.1-GA-39-g52f6504",
"Rev": "52f6504dc290bd928a8139ba94e3ab32ed9a6273"
},
{
"ImportPath": "google.golang.org/grpc/credentials",
"Rev": "5c7e717663448ccc1c9ff4d9ce75cdb1cca28121"
"Comment": "v1.0.1-GA-39-g52f6504",
"Rev": "52f6504dc290bd928a8139ba94e3ab32ed9a6273"
},
{
"ImportPath": "google.golang.org/grpc/grpclog",
"Rev": "5c7e717663448ccc1c9ff4d9ce75cdb1cca28121"
"Comment": "v1.0.1-GA-39-g52f6504",
"Rev": "52f6504dc290bd928a8139ba94e3ab32ed9a6273"
},
{
"ImportPath": "google.golang.org/grpc/internal",
"Rev": "5c7e717663448ccc1c9ff4d9ce75cdb1cca28121"
"Comment": "v1.0.1-GA-39-g52f6504",
"Rev": "52f6504dc290bd928a8139ba94e3ab32ed9a6273"
},
{
"ImportPath": "google.golang.org/grpc/metadata",
"Rev": "5c7e717663448ccc1c9ff4d9ce75cdb1cca28121"
"Comment": "v1.0.1-GA-39-g52f6504",
"Rev": "52f6504dc290bd928a8139ba94e3ab32ed9a6273"
},
{
"ImportPath": "google.golang.org/grpc/naming",
"Rev": "5c7e717663448ccc1c9ff4d9ce75cdb1cca28121"
"Comment": "v1.0.1-GA-39-g52f6504",
"Rev": "52f6504dc290bd928a8139ba94e3ab32ed9a6273"
},
{
"ImportPath": "google.golang.org/grpc/peer",
"Rev": "5c7e717663448ccc1c9ff4d9ce75cdb1cca28121"
"Comment": "v1.0.1-GA-39-g52f6504",
"Rev": "52f6504dc290bd928a8139ba94e3ab32ed9a6273"
},
{
"ImportPath": "google.golang.org/grpc/transport",
"Rev": "5c7e717663448ccc1c9ff4d9ce75cdb1cca28121"
"Comment": "v1.0.1-GA-39-g52f6504",
"Rev": "52f6504dc290bd928a8139ba94e3ab32ed9a6273"
},
{
"ImportPath": "gopkg.in/gorp.v1",

View File

@ -168,7 +168,7 @@ func main() {
cmd.FailOnError(err, "Failed to create SA client")
if c.CA.PublisherService != nil {
conn, err := bgrpc.ClientSetup(c.CA.PublisherService)
conn, err := bgrpc.ClientSetup(c.CA.PublisherService, scope)
cmd.FailOnError(err, "Failed to load credentials and create connection to service")
cai.Publisher = bgrpc.NewPublisherClientWrapper(pubPB.NewPublisherClient(conn), c.CA.PublisherService.Timeout.Duration)
} else {

View File

@ -113,7 +113,7 @@ func main() {
amqpConf := c.RA.AMQP
var vac core.ValidationAuthority
if c.RA.VAService != nil {
conn, err := bgrpc.ClientSetup(c.RA.VAService)
conn, err := bgrpc.ClientSetup(c.RA.VAService, scope)
cmd.FailOnError(err, "Unable to create VA client")
vac = bgrpc.NewValidationAuthorityGRPCClient(conn)
} else {

View File

@ -97,7 +97,7 @@ func main() {
var caaClient caaPB.CAACheckerClient
if c.VA.CAAService != nil {
conn, err := bgrpc.ClientSetup(c.VA.CAAService)
conn, err := bgrpc.ClientSetup(c.VA.CAAService, scope)
cmd.FailOnError(err, "Failed to load credentials and create connection to service")
caaClient = caaPB.NewCAACheckerClient(conn)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/letsencrypt/boulder/cmd"
pb "github.com/letsencrypt/boulder/cmd/caa-checker/proto"
bgrpc "github.com/letsencrypt/boulder/grpc"
"github.com/letsencrypt/boulder/metrics"
)
func main() {
@ -24,7 +25,7 @@ func main() {
ServerIssuerPath: "test/grpc-creds/ca.pem",
ClientCertificatePath: "test/grpc-creds/client.pem",
ClientKeyPath: "test/grpc-creds/key.pem",
})
}, metrics.NewNoopScope())
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to setup client connection: %s\n", err)
os.Exit(1)

View File

@ -574,7 +574,7 @@ func setupClients(c cmd.OCSPUpdaterConfig, stats metrics.Scope) (
var pubc core.Publisher
if c.Publisher != nil {
conn, err := bgrpc.ClientSetup(c.Publisher)
conn, err := bgrpc.ClientSetup(c.Publisher, stats)
cmd.FailOnError(err, "Failed to load credentials and create connection to service")
pubc = bgrpc.NewPublisherClientWrapper(pubPB.NewPublisherClient(conn), c.Publisher.Timeout.Duration)
} else {

View File

@ -3,10 +3,8 @@ package creds
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/credentials"
@ -25,7 +23,7 @@ func New(rootCAs *x509.CertPool, clientCerts []tls.Certificate) credentials.Tran
}
// ClientHandshake performs the TLS handshake for a client -> server connection
func (tc *transportCredentials) ClientHandshake(addr string, rawConn net.Conn, timeout time.Duration) (net.Conn, credentials.AuthInfo, error) {
func (tc *transportCredentials) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
host, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, nil, err
@ -42,8 +40,8 @@ func (tc *transportCredentials) ClientHandshake(addr string, rawConn net.Conn, t
errChan <- conn.Handshake()
}()
select {
case <-time.After(timeout):
return nil, nil, errors.New("boulder/grpc/creds: TLS handshake timed out")
case <-ctx.Done():
return nil, nil, fmt.Errorf("boulder/grpc/creds: %s", ctx.Err())
case err := <-errChan:
if err != nil {
_ = rawConn.Close()

View File

@ -12,6 +12,8 @@ import (
"testing"
"time"
"golang.org/x/net/context"
"github.com/letsencrypt/boulder/test"
)
@ -58,7 +60,7 @@ func TestTransportCredentials(t *testing.T) {
_ = rawConnA.Close()
}()
conn, _, err := tc.ClientHandshake("A:2020", rawConnA, time.Second)
conn, _, err := tc.ClientHandshake(context.Background(), "A:2020", rawConnA)
test.AssertNotError(t, err, "tc.ClientHandshake failed")
test.Assert(t, conn != nil, "tc.ClientHandshake returned a nil net.Conn")
@ -71,7 +73,7 @@ func TestTransportCredentials(t *testing.T) {
_ = rawConnB.Close()
}()
conn, _, err = tc.ClientHandshake("B:3030", rawConnB, time.Second)
conn, _, err = tc.ClientHandshake(context.Background(), "B:3030", rawConnB)
test.AssertNotError(t, err, "tc.ClientHandshake failed")
test.Assert(t, conn != nil, "tc.ClientHandshake returned a nil net.Conn")
@ -82,11 +84,16 @@ func TestTransportCredentials(t *testing.T) {
_ = ln.Close()
}()
addrC := ln.Addr().String()
stop := make(chan struct{}, 1)
go func() {
for {
_, err := ln.Accept()
test.AssertNotError(t, err, "ln.Accept failed")
time.Sleep(time.Second)
select {
case <-stop:
return
default:
_, _ = ln.Accept()
time.Sleep(2 * time.Millisecond)
}
}
}()
@ -96,8 +103,12 @@ func TestTransportCredentials(t *testing.T) {
_ = rawConnB.Close()
}()
conn, _, err = tc.ClientHandshake("A:2020", rawConnC, time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
conn, _, err = tc.ClientHandshake(ctx, "A:2020", rawConnC)
test.AssertError(t, err, "tc.ClientHandshake didn't timeout")
test.AssertEquals(t, err.Error(), "boulder/grpc/creds: TLS handshake timed out")
test.AssertEquals(t, err.Error(), "boulder/grpc/creds: context deadline exceeded")
test.Assert(t, conn == nil, "tc.ClientHandshake returned a non-nil net.Conn on failure")
stop <- struct{}{}
}

View File

@ -2,7 +2,6 @@ package grpc
import (
"errors"
"fmt"
"github.com/letsencrypt/boulder/metrics"
@ -18,17 +17,39 @@ type serverInterceptor struct {
func (si *serverInterceptor) intercept(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if info == nil {
si.stats.Inc("gRPC.NoInfo", 1)
si.stats.Inc("NoInfo", 1)
return nil, errors.New("passed nil *grpc.UnaryServerInfo")
}
si.stats.Inc(fmt.Sprintf("gRPC.%s", info.FullMethod), 1)
si.stats.GaugeDelta(fmt.Sprintf("gRPC.%s.InProgress", info.FullMethod), 1)
s := si.clk.Now()
methodScope := si.stats.NewScope(info.FullMethod)
methodScope.Inc("Calls", 1)
methodScope.GaugeDelta("InProgress", 1)
resp, err := handler(ctx, req)
si.stats.TimingDuration(fmt.Sprintf("gRPC.%s", info.FullMethod), si.clk.Now().Sub(s))
si.stats.GaugeDelta(fmt.Sprintf("gRPC.%s.InProgress", info.FullMethod), -1)
methodScope.TimingDuration("Latency", si.clk.Since(s))
methodScope.GaugeDelta("InProgress", -1)
if err != nil {
si.stats.Inc(fmt.Sprintf("gRPC.%s.Failed", info.FullMethod), 1)
methodScope.Inc("Failed", 1)
}
return resp, err
}
type clientInterceptor struct {
stats metrics.Scope
clk clock.Clock
}
// intercept fulfils the grpc.UnaryClientInterceptor interface, it should be noted that while this API
// is currently experimental the metrics it reports should be kept as stable as can be, *within reason*.
func (ci *clientInterceptor) intercept(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
s := ci.clk.Now()
methodScope := ci.stats.NewScope(method)
methodScope.Inc("Calls", 1)
methodScope.GaugeDelta("InProgress", 1)
err := invoker(ctx, method, req, reply, cc, opts...)
methodScope.TimingDuration("Latency", ci.clk.Since(s))
methodScope.GaugeDelta("InProgress", -1)
if err != nil {
methodScope.Inc("Failed", 1)
}
return err
}

View File

@ -24,29 +24,60 @@ func testHandler(_ context.Context, i interface{}) (interface{}, error) {
return nil, nil
}
func testInvoker(_ context.Context, method string, _, _ interface{}, _ *grpc.ClientConn, _ ...grpc.CallOption) error {
if method == "broke-test" {
return errors.New("")
}
fc.Sleep(time.Second)
return nil
}
func TestServerInterceptor(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
statter := metrics.NewMockStatter(ctrl)
stats := metrics.NewStatsdScope(statter, "fake")
stats := metrics.NewStatsdScope(statter, "fake", "gRPCServer")
si := serverInterceptor{stats, fc}
statter.EXPECT().Inc("fake.gRPC.NoInfo", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().Inc("fake.gRPCServer.NoInfo", int64(1), float32(1.0)).Return(nil)
_, err := si.intercept(context.Background(), nil, nil, testHandler)
test.AssertError(t, err, "si.intercept didn't fail with a nil grpc.UnaryServerInfo")
statter.EXPECT().Inc("fake.gRPC.test", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPC.test.InProgress", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().TimingDuration("fake.gRPC.test", time.Second, float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPC.test.InProgress", int64(-1), float32(1.0)).Return(nil)
statter.EXPECT().Inc("fake.gRPCServer.test.Calls", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPCServer.test.InProgress", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().TimingDuration("fake.gRPCServer.test.Latency", time.Second, float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPCServer.test.InProgress", int64(-1), float32(1.0)).Return(nil)
_, err = si.intercept(context.Background(), nil, &grpc.UnaryServerInfo{FullMethod: "test"}, testHandler)
test.AssertNotError(t, err, "si.intercept failed with a non-nil grpc.UnaryServerInfo")
statter.EXPECT().Inc("fake.gRPC.broke-test", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPC.broke-test.InProgress", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().TimingDuration("fake.gRPC.broke-test", time.Duration(0), float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPC.broke-test.InProgress", int64(-1), float32(1.0)).Return(nil)
statter.EXPECT().Inc("fake.gRPC.broke-test.Failed", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().Inc("fake.gRPCServer.broke-test.Calls", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPCServer.broke-test.InProgress", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().TimingDuration("fake.gRPCServer.broke-test.Latency", time.Duration(0), float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPCServer.broke-test.InProgress", int64(-1), float32(1.0)).Return(nil)
statter.EXPECT().Inc("fake.gRPCServer.broke-test.Failed", int64(1), float32(1.0)).Return(nil)
_, err = si.intercept(context.Background(), 0, &grpc.UnaryServerInfo{FullMethod: "broke-test"}, testHandler)
test.AssertError(t, err, "si.intercept didn't fail when handler returned a error")
}
func TestClientInterceptor(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
statter := metrics.NewMockStatter(ctrl)
stats := metrics.NewStatsdScope(statter, "fake", "gRPCClient")
ci := clientInterceptor{stats, fc}
statter.EXPECT().Inc("fake.gRPCClient.test.Calls", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPCClient.test.InProgress", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().TimingDuration("fake.gRPCClient.test.Latency", time.Second, float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPCClient.test.InProgress", int64(-1), float32(1.0)).Return(nil)
err := ci.intercept(context.Background(), "test", nil, nil, nil, testInvoker)
test.AssertNotError(t, err, "ci.intercept failed with a non-nil grpc.UnaryServerInfo")
statter.EXPECT().Inc("fake.gRPCClient.broke-test.Calls", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPCClient.broke-test.InProgress", int64(1), float32(1.0)).Return(nil)
statter.EXPECT().TimingDuration("fake.gRPCClient.broke-test.Latency", time.Duration(0), float32(1.0)).Return(nil)
statter.EXPECT().GaugeDelta("fake.gRPCClient.broke-test.InProgress", int64(-1), float32(1.0)).Return(nil)
statter.EXPECT().Inc("fake.gRPCClient.broke-test.Failed", int64(1), float32(1.0)).Return(nil)
err = ci.intercept(context.Background(), "broke-test", nil, nil, nil, testInvoker)
test.AssertError(t, err, "ci.intercept didn't fail when handler returned a error")
}

View File

@ -20,15 +20,20 @@ import (
// CodedError is a alias required to appease go vet
var CodedError = grpc.Errorf
var errNilScope = errors.New("boulder/grpc: received nil scope")
// ClientSetup loads various TLS certificates and creates a
// gRPC TransportCredentials that presents the client certificate
// and validates the certificate presented by the server is for a
// specific hostname and issued by the provided issuer certificate
// thens dials and returns a grpc.ClientConn to the remote service.
func ClientSetup(c *cmd.GRPCClientConfig) (*grpc.ClientConn, error) {
func ClientSetup(c *cmd.GRPCClientConfig, stats metrics.Scope) (*grpc.ClientConn, error) {
if len(c.ServerAddresses) == 0 {
return nil, fmt.Errorf("boulder/grpc: ServerAddresses is empty")
}
if stats == nil {
return nil, errNilScope
}
serverIssuerBytes, err := ioutil.ReadFile(c.ServerIssuerPath)
if err != nil {
return nil, err
@ -41,10 +46,12 @@ func ClientSetup(c *cmd.GRPCClientConfig) (*grpc.ClientConn, error) {
if err != nil {
return nil, err
}
ci := clientInterceptor{stats.NewScope("gRPCClient"), clock.Default()}
return grpc.Dial(
"", // Since our staticResolver provides addresses we don't need to pass an address here
grpc.WithTransportCredentials(bcreds.New(rootCAs, []tls.Certificate{clientCert})),
grpc.WithBalancer(grpc.RoundRobin(newStaticResolver(c.ServerAddresses))),
grpc.WithUnaryInterceptor(ci.intercept),
)
}
@ -53,6 +60,9 @@ func ClientSetup(c *cmd.GRPCClientConfig) (*grpc.ClientConn, error) {
// issued by the provided issuer certificate and presents a
// a server TLS certificate.
func NewServer(c *cmd.GRPCServerConfig, stats metrics.Scope) (*grpc.Server, net.Listener, error) {
if stats == nil {
return nil, nil, errNilScope
}
cert, err := tls.LoadX509KeyPair(c.ServerCertificatePath, c.ServerKeyPath)
if err != nil {
return nil, nil, err
@ -75,6 +85,6 @@ func NewServer(c *cmd.GRPCServerConfig, stats metrics.Scope) (*grpc.Server, net.
if err != nil {
return nil, nil, err
}
si := &serverInterceptor{stats, clock.Default()}
si := &serverInterceptor{stats.NewScope("gRPCServer"), clock.Default()}
return grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(si.intercept)), l, nil
}

View File

@ -21,39 +21,74 @@
package clock
import (
"sort"
"sync"
"time"
)
var systemClock Clock = sysClock{}
// Some in-use reflection-heavy systems, like facebookgo/inject, fail when given
// a value type like sysClock{}. Since it's hidden by an interface, this has
// surprised users. We fixed that by making systemClock a &sysClock.
var systemClock Clock = &sysClock{}
// Default returns a Clock that matches the actual system time.
func Default() Clock {
// New returns a Clock that matches the actual system time.
func New() Clock {
// This is a method instead of a public var to prevent folks from
// "making things work" by writing to the var instead of passing
// in a Clock.
return systemClock
}
// Deprecated: Default is just an alias for New but less memorable.
func Default() Clock {
return systemClock
}
// Clock is an abstraction over system time. New instances of it can
// be made with Default and NewFake.
type Clock interface {
// Now returns the Clock's current view of the time. Mutating the
// returned Time will not mutate the clock's time.
Now() time.Time
// Sleep causes the current goroutine to sleep for the given duration.
Sleep(time.Duration)
// After returns a channel that fires after the given duration.
After(time.Duration) <-chan time.Time
// Since is a short hand for Now().Sub(t).
Since(time.Time) time.Duration
// NewTimer makes a Timer based on this clock's time. Using Timers and
// negative durations in the Clock or Timer API is undefined behavior and
// may be changed.
NewTimer(time.Duration) *Timer
}
type sysClock struct{}
func (s sysClock) Now() time.Time {
func (s *sysClock) Now() time.Time {
return time.Now()
}
func (s sysClock) Sleep(d time.Duration) {
func (s *sysClock) Sleep(d time.Duration) {
time.Sleep(d)
}
func (s *sysClock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
func (s *sysClock) Since(t time.Time) time.Duration {
return time.Since(t)
}
func (s *sysClock) NewTimer(d time.Duration) *Timer {
tt := time.NewTimer(d)
return &Timer{C: tt.C, timer: tt}
}
// NewFake returns a FakeClock to be used in tests that need to
// manipulate time. Its initial value is always the unix epoch in the
// UTC timezone. The FakeClock returned is thread-safe.
@ -82,7 +117,8 @@ type FakeClock interface {
// but the clock's time will never be adjusted.
type fake struct {
sync.RWMutex
t time.Time
t time.Time
sends sortedSends
}
func (f *fake) Now() time.Time {
@ -99,14 +135,99 @@ func (f *fake) Sleep(d time.Duration) {
f.Add(d)
}
func (f *fake) After(d time.Duration) <-chan time.Time {
return f.NewTimer(d).C
}
func (f *fake) Since(t time.Time) time.Duration {
return f.Now().Sub(t)
}
func (f *fake) NewTimer(d time.Duration) *Timer {
f.Lock()
defer f.Unlock()
ch := make(chan time.Time, 1)
tt := f.t.Add(d)
ft := &fakeTimer{c: ch, clk: f, active: true}
t := &Timer{
C: ch,
fakeTimer: ft,
}
s := f.addSend(tt, ft)
ft.sends = []*send{s}
return t
}
func (f *fake) Add(d time.Duration) {
f.Lock()
defer f.Unlock()
f.t = f.t.Add(d)
f.sendTimes()
}
func (f *fake) Set(t time.Time) {
f.Lock()
defer f.Unlock()
f.t = t
f.sendTimes()
}
// Only to be called while the fake's lock is held
func (f *fake) sendTimes() {
newSends := make(sortedSends, 0)
for _, s := range f.sends {
if !s.active || !s.ft.active {
continue
}
if s.target.Equal(f.t) || s.target.Before(f.t) {
s.ft.active = false
s.active = false
// The select is to drop second sends from resets without a user
// receiving from ft.c.
select {
case s.ft.c <- s.target:
default:
}
}
if s.active {
newSends = append(newSends, s)
}
}
f.sends = newSends
}
// Only to be called while the fake's lock is held
func (f *fake) addSend(target time.Time, ft *fakeTimer) *send {
s := &send{target: target, ft: ft, active: true}
f.sends = append(f.sends, s)
// This will be a small enough slice to be fast. Can be replaced with a more
// complicated container if someone is making many timers.
sort.Sort(f.sends)
return s
}
// send is a struct that represents a scheduled send of a time.Time to its
// fakeTimer's channel. They are actually sent when the relevant fake's time
// goes equal or past their target time, as long as the relevant fakeTimer has
// not been Reset or Stop'ed. When a Timer is Reset, the old sends are
// deactivated and will be removed from the clocks list on the next attempt to
// send.
type send struct {
target time.Time
active bool
ft *fakeTimer
}
type sortedSends []*send
func (s sortedSends) Len() int {
return len(s)
}
func (s sortedSends) Less(i, j int) bool {
return s[i].target.Before(s[j].target)
}
func (s sortedSends) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

66
vendor/github.com/jmhodges/clock/timer.go generated vendored Normal file
View File

@ -0,0 +1,66 @@
package clock
import "time"
type Timer struct {
C <-chan time.Time
timer *time.Timer
fakeTimer *fakeTimer
}
func (t *Timer) Reset(d time.Duration) bool {
if t.timer != nil {
return t.timer.Reset(d)
}
return t.fakeTimer.Reset(d)
}
func (t *Timer) Stop() bool {
if t.timer != nil {
return t.timer.Stop()
}
return t.fakeTimer.Stop()
}
type fakeTimer struct {
// c is the same chan as C in the Timer that contains this fakeTimer
c chan<- time.Time
// clk is kept so we can maintain just one lock and to add and attempt to
// send the times made by this timer during Resets and Stops
clk *fake
// active is true until the fakeTimer's send is attempted or it has been
// stopped
active bool
// sends is where we store all the sends made by this timer so we can
// deactivate the old ones when Reset or Stop is called.
sends []*send
}
func (ft *fakeTimer) Reset(d time.Duration) bool {
ft.clk.Lock()
defer ft.clk.Unlock()
target := ft.clk.t.Add(d)
active := ft.active
ft.active = true
for _, s := range ft.sends {
s.active = false
}
s := ft.clk.addSend(target, ft)
ft.sends = []*send{s}
ft.clk.sendTimes()
return active
}
func (ft *fakeTimer) Stop() bool {
ft.clk.Lock()
defer ft.clk.Unlock()
active := ft.active
ft.active = false
for _, s := range ft.sends {
s.active = false
}
ft.sends = nil
ft.clk.sendTimes()
return active
}

View File

@ -1,17 +1,18 @@
language: go
go:
- 1.5.3
- 1.6
- 1.5.4
- 1.6.3
- 1.7
go_import_path: google.golang.org/grpc
before_install:
- go get github.com/axw/gocov/gocov
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
install:
- mkdir -p "$GOPATH/src/google.golang.org"
- mv "$TRAVIS_BUILD_DIR" "$GOPATH/src/google.golang.org/grpc"
- go get -u golang.org/x/tools/cmd/goimports github.com/golang/lint/golint github.com/axw/gocov/gocov github.com/mattn/goveralls golang.org/x/tools/cmd/cover
script:
- '! gofmt -s -d -l . 2>&1 | read'
- '! goimports -l . | read'
- '! golint ./... | grep -vE "(_string|\.pb)\.go:"'
- '! go tool vet -all . 2>&1 | grep -vE "constant [0-9]+ not a string in call to Errorf" | grep -vF .pb.go:' # https://github.com/golang/protobuf/issues/214
- make test testrace

View File

@ -28,5 +28,5 @@ See [API documentation](https://godoc.org/google.golang.org/grpc) for package an
Status
------
Beta release
GA

View File

@ -40,7 +40,6 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/naming"
"google.golang.org/grpc/transport"
)
// Address represents a server the client connects to.
@ -94,10 +93,10 @@ type Balancer interface {
// instead of blocking.
//
// The function returns put which is called once the rpc has completed or failed.
// put can collect and report RPC stats to a remote load balancer. gRPC internals
// will try to call this again if err is non-nil (unless err is ErrClientConnClosing).
// put can collect and report RPC stats to a remote load balancer.
//
// TODO: Add other non-recoverable errors?
// This function should only return the errors Balancer cannot recover by itself.
// gRPC internals will fail the RPC if an error is returned.
Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
// Notify returns a channel that is used by gRPC internals to watch the addresses
// gRPC needs to connect. The addresses might be from a name resolver or remote
@ -158,14 +157,15 @@ type roundRobin struct {
func (rr *roundRobin) watchAddrUpdates() error {
updates, err := rr.w.Next()
if err != nil {
grpclog.Println("grpc: the naming watcher stops working due to %v.", err)
grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err)
return err
}
rr.mu.Lock()
defer rr.mu.Unlock()
for _, update := range updates {
addr := Address{
Addr: update.Addr,
Addr: update.Addr,
Metadata: update.Metadata,
}
switch update.Op {
case naming.Add:
@ -298,8 +298,19 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
}
}
}
// There is no address available. Wait on rr.waitCh.
// TODO(zhaoq): Handle the case when opts.BlockingWait is false.
if !opts.BlockingWait {
if len(rr.addrs) == 0 {
rr.mu.Unlock()
err = fmt.Errorf("there is no address available")
return
}
// Returns the next addr on rr.addrs for failfast RPCs.
addr = rr.addrs[rr.next].addr
rr.next++
rr.mu.Unlock()
return
}
// Wait on rr.waitCh for non-failfast RPCs.
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
@ -310,7 +321,7 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
for {
select {
case <-ctx.Done():
err = transport.ContextErr(ctx.Err())
err = ctx.Err()
return
case <-ch:
rr.mu.Lock()

View File

@ -36,6 +36,7 @@ package grpc
import (
"bytes"
"io"
"math"
"time"
"golang.org/x/net/context"
@ -51,13 +52,20 @@ import (
func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error {
// Try to acquire header metadata from the server if there is any.
var err error
defer func() {
if err != nil {
if _, ok := err.(transport.ConnectionError); !ok {
t.CloseStream(stream, err)
}
}
}()
c.headerMD, err = stream.Header()
if err != nil {
return err
}
p := &parser{r: stream}
for {
if err = recv(p, dopts.codec, stream, dopts.dc, reply); err != nil {
if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil {
if err == io.EOF {
break
}
@ -76,6 +84,7 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
}
defer func() {
if err != nil {
// If err is connection error, t will be closed, no need to close stream here.
if _, ok := err.(transport.ConnectionError); !ok {
t.CloseStream(stream, err)
}
@ -87,10 +96,13 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
}
outBuf, err := encode(codec, args, compressor, cbuf)
if err != nil {
return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
return nil, Errorf(codes.Internal, "grpc: %v", err)
}
err = t.Write(stream, outBuf, opts)
if err != nil {
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
// recvResponse to get the final status.
if err != nil && err != io.EOF {
return nil, err
}
// Sent successfully.
@ -100,8 +112,15 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
// Invoke sends the RPC request on the wire and returns after response is received.
// Invoke is called by generated code. Also users can call Invoke directly when it
// is really needed in their use cases.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
var c callInfo
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
c := defaultCallInfo
for _, o := range opts {
if err := o.before(&c); err != nil {
return toRPCErr(err)
@ -155,19 +174,17 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if err == ErrClientConnClosing {
return Errorf(codes.FailedPrecondition, "%v", err)
if _, ok := err.(*rpcError); ok {
return err
}
if _, ok := err.(transport.StreamError); ok {
return toRPCErr(err)
}
if _, ok := err.(transport.ConnectionError); ok {
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return toRPCErr(err)
return Errorf(codes.Unavailable, "%v", err)
}
continue
}
// All the remaining cases are treated as retryable.
continue
// All the other errors are treated as Internal errors.
return Errorf(codes.Internal, "%v", err)
}
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
@ -178,7 +195,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok {
// Retry a non-failfast RPC when
// i) there is a connection error; or
// ii) the server started to drain before this RPC was initiated.
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
@ -186,20 +206,18 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
return toRPCErr(err)
}
// Receive the response
err = recvResponse(cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok {
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
continue
}
t.CloseStream(stream, err)
return toRPCErr(err)
}
if c.traceInfo.tr != nil {

View File

@ -43,7 +43,6 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/transport"
@ -68,13 +67,15 @@ var (
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
// and grpc.WithInsecure() are both called for a connection.
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
// errNetworkIP indicates that the connection is down due to some network I/O error.
// errNetworkIO indicates that the connection is down due to some network I/O error.
errNetworkIO = errors.New("grpc: failed with network I/O error")
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
errNoAddr = errors.New("grpc: there is no address available to dial")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@ -82,15 +83,17 @@ var (
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
codec Codec
cp Compressor
dc Decompressor
bs backoffStrategy
balancer Balancer
block bool
insecure bool
timeout time.Duration
copts transport.ConnectOptions
unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor
codec Codec
cp Compressor
dc Decompressor
bs backoffStrategy
balancer Balancer
block bool
insecure bool
timeout time.Duration
copts transport.ConnectOptions
}
// DialOption configures how we set up the connection.
@ -196,9 +199,14 @@ func WithTimeout(d time.Duration) DialOption {
}
// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
func WithDialer(f func(addr string, timeout time.Duration) (net.Conn, error)) DialOption {
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
o.copts.Dialer = f
o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
if deadline, ok := ctx.Deadline(); ok {
return f(addr, deadline.Sub(time.Now()))
}
return f(addr, 0)
}
}
}
@ -209,49 +217,86 @@ func WithUserAgent(s string) DialOption {
}
}
// Dial creates a client connection the given target.
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
o.unaryInt = f
}
}
// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
return func(o *dialOptions) {
o.streamInt = f
}
}
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
// DialContext creates a client connection to the given target. ctx can be used to
// cancel or expire the pending connecting. Once this function returns, the
// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
// to terminate all the pending operations after this function returns.
// This is the EXPERIMENTAL API.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[Address]*addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
defer func() {
select {
case <-ctx.Done():
conn, err = nil, ctx.Err()
default:
}
if err != nil {
cc.Close()
}
}()
for _, opt := range opts {
opt(&cc.dopts)
}
// Set defaults.
if cc.dopts.codec == nil {
// Set the default codec.
cc.dopts.codec = protoCodec{}
}
if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig
}
cc.balancer = cc.dopts.balancer
if cc.balancer == nil {
cc.balancer = RoundRobin(nil)
}
if err := cc.balancer.Start(target); err != nil {
return nil, err
}
var (
ok bool
addrs []Address
)
ch := cc.balancer.Notify()
if ch == nil {
// There is no name resolver installed.
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
} else {
addrs, ok = <-ch
if !ok || len(addrs) == 0 {
return nil, errNoAddr
if err := cc.dopts.balancer.Start(target); err != nil {
return nil, err
}
ch := cc.dopts.balancer.Notify()
if ch == nil {
// There is no name resolver installed.
addrs = append(addrs, Address{Addr: target})
} else {
addrs, ok = <-ch
if !ok || len(addrs) == 0 {
return nil, errNoAddr
}
}
}
waitC := make(chan error, 1)
go func() {
for _, a := range addrs {
if err := cc.newAddrConn(a, false); err != nil {
if err := cc.resetAddrConn(a, false, nil); err != nil {
waitC <- err
return
}
@ -263,15 +308,17 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
timeoutCh = time.After(cc.dopts.timeout)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-waitC:
if err != nil {
cc.Close()
return nil, err
}
case <-timeoutCh:
cc.Close()
return nil, ErrClientConnTimeout
}
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}
@ -318,8 +365,10 @@ func (s ConnectivityState) String() string {
// ClientConn represents a client connection to an RPC server.
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
target string
balancer Balancer
authority string
dopts dialOptions
@ -328,7 +377,7 @@ type ClientConn struct {
}
func (cc *ClientConn) lbWatcher() {
for addrs := range cc.balancer.Notify() {
for addrs := range cc.dopts.balancer.Notify() {
var (
add []Address // Addresses need to setup connections.
del []*addrConn // Connections need to tear down.
@ -349,11 +398,12 @@ func (cc *ClientConn) lbWatcher() {
}
if !keep {
del = append(del, c)
delete(cc.conns, c.addr)
}
}
cc.mu.Unlock()
for _, a := range add {
cc.newAddrConn(a, true)
cc.resetAddrConn(a, true, nil)
}
for _, c := range del {
c.tearDown(errConnDrain)
@ -361,13 +411,17 @@ func (cc *ClientConn) lbWatcher() {
}
}
func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
dopts: cc.dopts,
shutdownChan: make(chan struct{}),
cc: cc,
addr: addr,
dopts: cc.dopts,
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
}
@ -385,26 +439,44 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
}
}
}
// Insert ac into ac.cc.conns. This needs to be done before any getTransport(...) is called.
ac.cc.mu.Lock()
if ac.cc.conns == nil {
ac.cc.mu.Unlock()
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return ErrClientConnClosing
}
stale := ac.cc.conns[ac.addr]
ac.cc.conns[ac.addr] = ac
ac.cc.mu.Unlock()
stale := cc.conns[ac.addr]
cc.conns[ac.addr] = ac
cc.mu.Unlock()
if stale != nil {
// There is an addrConn alive on ac.addr already. This could be due to
// i) stale's Close is undergoing;
// ii) a buggy Balancer notifies duplicated Addresses.
stale.tearDown(errConnDrain)
// 1) a buggy Balancer notifies duplicated Addresses;
// 2) goaway was received, a new ac will replace the old ac.
// The old ac should be deleted from cc.conns, but the
// underlying transport should drain rather than close.
if tearDownErr == nil {
// tearDownErr is nil if resetAddrConn is called by
// 1) Dial
// 2) lbWatcher
// In both cases, the stale ac should drain, not close.
stale.tearDown(errConnDrain)
} else {
stale.tearDown(tearDownErr)
}
}
ac.stateCV = sync.NewCond(&ac.mu)
// skipWait may overwrite the decision in ac.dopts.block.
if ac.dopts.block && !skipWait {
if err := ac.resetTransport(false); err != nil {
ac.tearDown(err)
if err != errConnClosing {
// Tear down ac and delete it from cc.conns.
cc.mu.Lock()
delete(cc.conns, ac.addr)
cc.mu.Unlock()
ac.tearDown(err)
}
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
return e.Origin()
}
return err
}
// Start to monitor the error status of transport.
@ -414,7 +486,10 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
go func() {
if err := ac.resetTransport(false); err != nil {
grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
ac.tearDown(err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
ac.transportMonitor()
@ -424,25 +499,48 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
}
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
// TODO(zhaoq): Implement fail-fast logic.
addr, put, err := cc.balancer.Get(ctx, opts)
if err != nil {
return nil, nil, err
}
cc.mu.RLock()
if cc.conns == nil {
var (
ac *addrConn
ok bool
put func()
)
if cc.dopts.balancer == nil {
// If balancer is nil, there should be only one addrConn available.
cc.mu.RLock()
if cc.conns == nil {
cc.mu.RUnlock()
return nil, nil, toRPCErr(ErrClientConnClosing)
}
for _, ac = range cc.conns {
// Break after the first iteration to get the first addrConn.
ok = true
break
}
cc.mu.RUnlock()
} else {
var (
addr Address
err error
)
addr, put, err = cc.dopts.balancer.Get(ctx, opts)
if err != nil {
return nil, nil, toRPCErr(err)
}
cc.mu.RLock()
if cc.conns == nil {
cc.mu.RUnlock()
return nil, nil, toRPCErr(ErrClientConnClosing)
}
ac, ok = cc.conns[addr]
cc.mu.RUnlock()
return nil, nil, ErrClientConnClosing
}
ac, ok := cc.conns[addr]
cc.mu.RUnlock()
if !ok {
if put != nil {
put()
}
return nil, nil, transport.StreamErrorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
return nil, nil, errConnClosing
}
t, err := ac.wait(ctx)
t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
if err != nil {
if put != nil {
put()
@ -454,6 +552,8 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
// Close tears down the ClientConn and all underlying connections.
func (cc *ClientConn) Close() error {
cc.cancel()
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
@ -462,7 +562,9 @@ func (cc *ClientConn) Close() error {
conns := cc.conns
cc.conns = nil
cc.mu.Unlock()
cc.balancer.Close()
if cc.dopts.balancer != nil {
cc.dopts.balancer.Close()
}
for _, ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
@ -471,11 +573,13 @@ func (cc *ClientConn) Close() error {
// addrConn is a network connection to a given address.
type addrConn struct {
cc *ClientConn
addr Address
dopts dialOptions
shutdownChan chan struct{}
events trace.EventLog
ctx context.Context
cancel context.CancelFunc
cc *ClientConn
addr Address
dopts dialOptions
events trace.EventLog
mu sync.Mutex
state ConnectivityState
@ -485,6 +589,9 @@ type addrConn struct {
// due to timeout.
ready chan struct{}
transport transport.ClientTransport
// The reason this addrConn is torn down.
tearDownErr error
}
// printf records an event in ac's event log, unless ac has been closed.
@ -540,8 +647,7 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti
}
func (ac *addrConn) resetTransport(closeTransport bool) error {
var retries int
for {
for retries := 0; ; retries++ {
ac.mu.Lock()
ac.printf("connecting")
if ac.state == Shutdown {
@ -561,13 +667,20 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
t.Close()
}
sleepTime := ac.dopts.bs.backoff(retries)
ac.dopts.copts.Timeout = sleepTime
if sleepTime < minConnectTimeout {
ac.dopts.copts.Timeout = minConnectTimeout
timeout := minConnectTimeout
if timeout < sleepTime {
timeout = sleepTime
}
ctx, cancel := context.WithTimeout(ac.ctx, timeout)
connectTime := time.Now()
newTransport, err := transport.NewClientTransport(ac.addr.Addr, &ac.dopts.copts)
newTransport, err := transport.NewClientTransport(ctx, ac.addr.Addr, ac.dopts.copts)
if err != nil {
cancel()
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
return err
}
grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
ac.mu.Lock()
if ac.state == Shutdown {
// ac.tearDown(...) has been invoked.
@ -582,17 +695,12 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
ac.ready = nil
}
ac.mu.Unlock()
sleepTime -= time.Since(connectTime)
if sleepTime < 0 {
sleepTime = 0
}
closeTransport = false
select {
case <-time.After(sleepTime):
case <-ac.shutdownChan:
case <-time.After(sleepTime - time.Since(connectTime)):
case <-ac.ctx.Done():
return ac.ctx.Err()
}
retries++
grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
continue
}
ac.mu.Lock()
@ -610,7 +718,9 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
close(ac.ready)
ac.ready = nil
}
ac.down = ac.cc.balancer.Up(ac.addr)
if ac.cc.dopts.balancer != nil {
ac.down = ac.cc.dopts.balancer.Up(ac.addr)
}
ac.mu.Unlock()
return nil
}
@ -624,14 +734,42 @@ func (ac *addrConn) transportMonitor() {
t := ac.transport
ac.mu.Unlock()
select {
// shutdownChan is needed to detect the teardown when
// This is needed to detect the teardown when
// the addrConn is idle (i.e., no RPC in flight).
case <-ac.shutdownChan:
case <-ac.ctx.Done():
select {
case <-t.Error():
t.Close()
default:
}
return
case <-t.GoAway():
// If GoAway happens without any network I/O error, ac is closed without shutting down the
// underlying transport (the transport will be closed when all the pending RPCs finished or
// failed.).
// If GoAway and some network I/O error happen concurrently, ac and its underlying transport
// are closed.
// In both cases, a new ac is created.
select {
case <-t.Error():
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
default:
ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
}
return
case <-t.Error():
select {
case <-ac.ctx.Done():
t.Close()
return
case <-t.GoAway():
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
return
default:
}
ac.mu.Lock()
if ac.state == Shutdown {
// ac.tearDown(...) has been invoked.
// ac has been shutdown.
ac.mu.Unlock()
return
}
@ -643,38 +781,53 @@ func (ac *addrConn) transportMonitor() {
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
}
}
}
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed.
func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) {
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
// iv) transport is in TransientFailure and there's no balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
case ac.state == Shutdown:
if failfast || !hasBalancer {
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
err := ac.tearDownErr
ac.mu.Unlock()
return nil, err
}
ac.mu.Unlock()
return nil, errConnClosing
case ac.state == Ready:
ct := ac.transport
ac.mu.Unlock()
return ct, nil
default:
ready := ac.ready
if ready == nil {
ready = make(chan struct{})
ac.ready = ready
}
ac.mu.Unlock()
select {
case <-ctx.Done():
return nil, transport.ContextErr(ctx.Err())
// Wait until the new transport is ready or failed.
case <-ready:
case ac.state == TransientFailure:
if failfast || hasBalancer {
ac.mu.Unlock()
return nil, errConnUnavailable
}
}
ready := ac.ready
if ready == nil {
ready = make(chan struct{})
ac.ready = ready
}
ac.mu.Unlock()
select {
case <-ctx.Done():
return nil, toRPCErr(ctx.Err())
// Wait until the new transport is ready or failed.
case <-ready:
}
}
}
@ -682,24 +835,28 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error)
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many addrConn's in a
// tight loop.
// tearDown doesn't remove ac from ac.cc.conns.
func (ac *addrConn) tearDown(err error) {
ac.cancel()
ac.mu.Lock()
defer func() {
ac.mu.Unlock()
ac.cc.mu.Lock()
if ac.cc.conns != nil {
delete(ac.cc.conns, ac.addr)
}
ac.cc.mu.Unlock()
}()
if ac.state == Shutdown {
return
}
ac.state = Shutdown
defer ac.mu.Unlock()
if ac.down != nil {
ac.down(downErrorf(false, false, "%v", err))
ac.down = nil
}
if err == errConnDrain && ac.transport != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
ac.transport.GracefulClose()
}
if ac.state == Shutdown {
return
}
ac.state = Shutdown
ac.tearDownErr = err
ac.stateCV.Broadcast()
if ac.events != nil {
ac.events.Finish()
@ -709,15 +866,8 @@ func (ac *addrConn) tearDown(err error) {
close(ac.ready)
ac.ready = nil
}
if ac.transport != nil {
if err == errConnDrain {
ac.transport.GracefulClose()
} else {
ac.transport.Close()
}
}
if ac.shutdownChan != nil {
close(ac.shutdownChan)
if ac.transport != nil && err != errConnDrain {
ac.transport.Close()
}
return
}

0
vendor/google.golang.org/grpc/codegen.sh generated vendored Normal file → Executable file
View File

0
vendor/google.golang.org/grpc/coverage.sh generated vendored Normal file → Executable file
View File

View File

@ -40,11 +40,11 @@ package credentials
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"net"
"strings"
"time"
"golang.org/x/net/context"
)
@ -87,17 +87,24 @@ type AuthInfo interface {
AuthType() string
}
var (
// ErrConnDispatched indicates that rawConn has been dispatched out of gRPC
// and the caller should not close rawConn.
ErrConnDispatched = errors.New("credentials: rawConn is dispatched out of gRPC")
)
// TransportCredentials defines the common interface for all the live gRPC wire
// protocols and supported transport security protocols (e.g., TLS, SSL).
type TransportCredentials interface {
// ClientHandshake does the authentication handshake specified by the corresponding
// authentication protocol on rawConn for clients. It returns the authenticated
// connection and the corresponding auth information about the connection.
ClientHandshake(addr string, rawConn net.Conn, timeout time.Duration) (net.Conn, AuthInfo, error)
// Implementations must use the provided context to implement timely cancellation.
ClientHandshake(context.Context, string, net.Conn) (net.Conn, AuthInfo, error)
// ServerHandshake does the authentication handshake for servers. It returns
// the authenticated connection and the corresponding auth information about
// the connection.
ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error)
ServerHandshake(net.Conn) (net.Conn, AuthInfo, error)
// Info provides the ProtocolInfo of this TransportCredentials.
Info() ProtocolInfo
}
@ -116,7 +123,7 @@ func (t TLSInfo) AuthType() string {
// tlsCreds is the credentials required for authenticating a connection using TLS.
type tlsCreds struct {
// TLS configuration
config tls.Config
config *tls.Config
}
func (c tlsCreds) Info() ProtocolInfo {
@ -136,40 +143,28 @@ func (c *tlsCreds) RequireTransportSecurity() bool {
return true
}
type timeoutError struct{}
func (timeoutError) Error() string { return "credentials: Dial timed out" }
func (timeoutError) Timeout() bool { return true }
func (timeoutError) Temporary() bool { return true }
func (c *tlsCreds) ClientHandshake(addr string, rawConn net.Conn, timeout time.Duration) (_ net.Conn, _ AuthInfo, err error) {
// borrow some code from tls.DialWithDialer
var errChannel chan error
if timeout != 0 {
errChannel = make(chan error, 2)
time.AfterFunc(timeout, func() {
errChannel <- timeoutError{}
})
}
if c.config.ServerName == "" {
func (c *tlsCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (_ net.Conn, _ AuthInfo, err error) {
// use local cfg to avoid clobbering ServerName if using multiple endpoints
cfg := cloneTLSConfig(c.config)
if cfg.ServerName == "" {
colonPos := strings.LastIndex(addr, ":")
if colonPos == -1 {
colonPos = len(addr)
}
c.config.ServerName = addr[:colonPos]
cfg.ServerName = addr[:colonPos]
}
conn := tls.Client(rawConn, &c.config)
if timeout == 0 {
err = conn.Handshake()
} else {
go func() {
errChannel <- conn.Handshake()
}()
err = <-errChannel
}
if err != nil {
rawConn.Close()
return nil, nil, err
conn := tls.Client(rawConn, cfg)
errChannel := make(chan error, 1)
go func() {
errChannel <- conn.Handshake()
}()
select {
case err := <-errChannel:
if err != nil {
return nil, nil, err
}
case <-ctx.Done():
return nil, nil, ctx.Err()
}
// TODO(zhaoq): Omit the auth info for client now. It is more for
// information than anything else.
@ -177,9 +172,8 @@ func (c *tlsCreds) ClientHandshake(addr string, rawConn net.Conn, timeout time.D
}
func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) {
conn := tls.Server(rawConn, &c.config)
conn := tls.Server(rawConn, c.config)
if err := conn.Handshake(); err != nil {
rawConn.Close()
return nil, nil, err
}
return conn, TLSInfo{conn.ConnectionState()}, nil
@ -187,7 +181,7 @@ func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error)
// NewTLS uses c to construct a TransportCredentials based on TLS.
func NewTLS(c *tls.Config) TransportCredentials {
tc := &tlsCreds{*c}
tc := &tlsCreds{cloneTLSConfig(c)}
tc.config.NextProtos = alpnProtoStr
return tc
}

View File

@ -0,0 +1,76 @@
// +build go1.7
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package credentials
import (
"crypto/tls"
)
// cloneTLSConfig returns a shallow clone of the exported
// fields of cfg, ignoring the unexported sync.Once, which
// contains a mutex and must not be copied.
//
// If cfg is nil, a new zero tls.Config is returned.
//
// TODO replace this function with official clone function.
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}
}
return &tls.Config{
Rand: cfg.Rand,
Time: cfg.Time,
Certificates: cfg.Certificates,
NameToCertificate: cfg.NameToCertificate,
GetCertificate: cfg.GetCertificate,
RootCAs: cfg.RootCAs,
NextProtos: cfg.NextProtos,
ServerName: cfg.ServerName,
ClientAuth: cfg.ClientAuth,
ClientCAs: cfg.ClientCAs,
InsecureSkipVerify: cfg.InsecureSkipVerify,
CipherSuites: cfg.CipherSuites,
PreferServerCipherSuites: cfg.PreferServerCipherSuites,
SessionTicketsDisabled: cfg.SessionTicketsDisabled,
SessionTicketKey: cfg.SessionTicketKey,
ClientSessionCache: cfg.ClientSessionCache,
MinVersion: cfg.MinVersion,
MaxVersion: cfg.MaxVersion,
CurvePreferences: cfg.CurvePreferences,
DynamicRecordSizingDisabled: cfg.DynamicRecordSizingDisabled,
Renegotiation: cfg.Renegotiation,
}
}

View File

@ -0,0 +1,74 @@
// +build !go1.7
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package credentials
import (
"crypto/tls"
)
// cloneTLSConfig returns a shallow clone of the exported
// fields of cfg, ignoring the unexported sync.Once, which
// contains a mutex and must not be copied.
//
// If cfg is nil, a new zero tls.Config is returned.
//
// TODO replace this function with official clone function.
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}
}
return &tls.Config{
Rand: cfg.Rand,
Time: cfg.Time,
Certificates: cfg.Certificates,
NameToCertificate: cfg.NameToCertificate,
GetCertificate: cfg.GetCertificate,
RootCAs: cfg.RootCAs,
NextProtos: cfg.NextProtos,
ServerName: cfg.ServerName,
ClientAuth: cfg.ClientAuth,
ClientCAs: cfg.ClientCAs,
InsecureSkipVerify: cfg.InsecureSkipVerify,
CipherSuites: cfg.CipherSuites,
PreferServerCipherSuites: cfg.PreferServerCipherSuites,
SessionTicketsDisabled: cfg.SessionTicketsDisabled,
SessionTicketKey: cfg.SessionTicketKey,
ClientSessionCache: cfg.ClientSessionCache,
MinVersion: cfg.MinVersion,
MaxVersion: cfg.MaxVersion,
CurvePreferences: cfg.CurvePreferences,
}
}

View File

@ -37,6 +37,22 @@ import (
"golang.org/x/net/context"
)
// UnaryInvoker is called by UnaryClientInterceptor to complete RPCs.
type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error
// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. inovker is the handler to complete the RPC
// and it is the responsibility of the interceptor to call it.
// This is the EXPERIMENTAL API.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
// Streamer is called by StreamClientInterceptor to create a ClientStream.
type Streamer func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error)
// StreamClientInterceptor intercepts the creation of ClientStream. It may return a custom ClientStream to intercept all I/O
// operations. streamer is the handlder to create a ClientStream and it is the responsibility of the interceptor to call it.
// This is the EXPERIMENTAL API.
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
// UnaryServerInfo consists of various information about a unary RPC on
// server side. All per-rpc information may be mutated by the interceptor.
type UnaryServerInfo struct {

View File

@ -60,15 +60,21 @@ func encodeKeyValue(k, v string) (string, string) {
// DecodeKeyValue returns the original key and value corresponding to the
// encoded data in k, v.
// If k is a binary header and v contains comma, v is split on comma before decoded,
// and the decoded v will be joined with comma before returned.
func DecodeKeyValue(k, v string) (string, string, error) {
if !strings.HasSuffix(k, binHdrSuffix) {
return k, v, nil
}
val, err := base64.StdEncoding.DecodeString(v)
if err != nil {
return "", "", err
vvs := strings.Split(v, ",")
for i, vv := range vvs {
val, err := base64.StdEncoding.DecodeString(vv)
if err != nil {
return "", "", err
}
vvs[i] = string(val)
}
return k, string(val), nil
return k, strings.Join(vvs, ","), nil
}
// MD is a mapping from metadata keys to values. Users should use the following

View File

@ -141,6 +141,8 @@ type callInfo struct {
traceInfo traceInfo // in trace.go
}
var defaultCallInfo = callInfo{failFast: true}
// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
type CallOption interface {
@ -179,6 +181,19 @@ func Trailer(md *metadata.MD) CallOption {
})
}
// FailFast configures the action to take when an RPC is attempted on broken
// connections or unreachable servers. If failfast is true, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
// connection is available (or the call is canceled or times out) and will retry
// the call if it fails due to a transient error. Please refer to
// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md
func FailFast(failFast bool) CallOption {
return beforeCall(func(c *callInfo) error {
c.failFast = failFast
return nil
})
}
// The format of the payload: compressed or not?
type payloadFormat uint8
@ -212,7 +227,7 @@ type parser struct {
// No other error values or types must be returned, which also means
// that the underlying io.Reader must not return an incompatible
// error.
func (p *parser) recvMsg() (pf payloadFormat, msg []byte, err error) {
func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
return 0, nil, err
}
@ -223,6 +238,9 @@ func (p *parser) recvMsg() (pf payloadFormat, msg []byte, err error) {
if length == 0 {
return pf, nil, nil
}
if length > uint32(maxMsgSize) {
return 0, nil, Errorf(codes.Internal, "grpc: received message length %d exceeding the max size %d", length, maxMsgSize)
}
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
@ -285,16 +303,16 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
case compressionNone:
case compressionMade:
if dc == nil || recvCompress != dc.Type() {
return transport.StreamErrorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
return Errorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
}
default:
return transport.StreamErrorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
return Errorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
}
return nil
}
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}) error {
pf, d, err := p.recvMsg()
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error {
pf, d, err := p.recvMsg(maxMsgSize)
if err != nil {
return err
}
@ -304,11 +322,16 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
if pf == compressionMade {
d, err = dc.Do(bytes.NewReader(d))
if err != nil {
return transport.StreamErrorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
}
if len(d) > maxMsgSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize)
}
if err := c.Unmarshal(d, m); err != nil {
return transport.StreamErrorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
return nil
}
@ -319,7 +342,7 @@ type rpcError struct {
desc string
}
func (e rpcError) Error() string {
func (e *rpcError) Error() string {
return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
}
@ -329,7 +352,7 @@ func Code(err error) codes.Code {
if err == nil {
return codes.OK
}
if e, ok := err.(rpcError); ok {
if e, ok := err.(*rpcError); ok {
return e.code
}
return codes.Unknown
@ -341,7 +364,7 @@ func ErrorDesc(err error) string {
if err == nil {
return ""
}
if e, ok := err.(rpcError); ok {
if e, ok := err.(*rpcError); ok {
return e.desc
}
return err.Error()
@ -353,7 +376,7 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
if c == codes.OK {
return nil
}
return rpcError{
return &rpcError{
code: c,
desc: fmt.Sprintf(format, a...),
}
@ -362,18 +385,37 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
// toRPCErr converts an error into a rpcError.
func toRPCErr(err error) error {
switch e := err.(type) {
case rpcError:
case *rpcError:
return err
case transport.StreamError:
return rpcError{
return &rpcError{
code: e.Code,
desc: e.Desc,
}
case transport.ConnectionError:
return rpcError{
return &rpcError{
code: codes.Internal,
desc: e.Desc,
}
default:
switch err {
case context.DeadlineExceeded:
return &rpcError{
code: codes.DeadlineExceeded,
desc: err.Error(),
}
case context.Canceled:
return &rpcError{
code: codes.Canceled,
desc: err.Error(),
}
case ErrClientConnClosing:
return &rpcError{
code: codes.FailedPrecondition,
desc: err.Error(),
}
}
}
return Errorf(codes.Unknown, "%v", err)
}

View File

@ -82,15 +82,20 @@ type service struct {
server interface{} // the server for service methods
md map[string]*MethodDesc
sd map[string]*StreamDesc
mdata interface{}
}
// Server is a gRPC server to serve RPC requests.
type Server struct {
opts options
mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[io.Closer]bool
mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[io.Closer]bool
drain bool
// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
// and all the transport goes away.
cv *sync.Cond
m map[string]*service // service name -> service info
events trace.EventLog
}
@ -100,12 +105,15 @@ type options struct {
codec Codec
cp Compressor
dc Decompressor
maxMsgSize int
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
}
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
// A ServerOption sets options.
type ServerOption func(*options)
@ -116,20 +124,28 @@ func CustomCodec(codec Codec) ServerOption {
}
}
// RPCCompressor returns a ServerOption that sets a compressor for outbound message.
// RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
func RPCCompressor(cp Compressor) ServerOption {
return func(o *options) {
o.cp = cp
}
}
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound message.
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
func RPCDecompressor(dc Decompressor) ServerOption {
return func(o *options) {
o.dc = dc
}
}
// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages.
// If this is not set, gRPC uses the default 4MB.
func MaxMsgSize(m int) ServerOption {
return func(o *options) {
o.maxMsgSize = m
}
}
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
// of concurrent streams to each ServerTransport.
func MaxConcurrentStreams(n uint32) ServerOption {
@ -172,6 +188,7 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption {
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
var opts options
opts.maxMsgSize = defaultMaxMsgSize
for _, o := range opt {
o(&opts)
}
@ -185,6 +202,7 @@ func NewServer(opt ...ServerOption) *Server {
conns: make(map[io.Closer]bool),
m: make(map[string]*service),
}
s.cv = sync.NewCond(&s.mu)
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
@ -231,6 +249,7 @@ func (s *Server) register(sd *ServiceDesc, ss interface{}) {
server: ss,
md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
@ -243,6 +262,52 @@ func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.m[sd.ServiceName] = srv
}
// MethodInfo contains the information of an RPC including its method name and type.
type MethodInfo struct {
// Name is the method name only, without the service name or package name.
Name string
// IsClientStream indicates whether the RPC is a client streaming RPC.
IsClientStream bool
// IsServerStream indicates whether the RPC is a server streaming RPC.
IsServerStream bool
}
// ServiceInfo contains unary RPC method info, streaming RPC methid info and metadata for a service.
type ServiceInfo struct {
Methods []MethodInfo
// Metadata is the metadata specified in ServiceDesc when registering service.
Metadata interface{}
}
// GetServiceInfo returns a map from service names to ServiceInfo.
// Service names include the package names, in the form of <package>.<service>.
func (s *Server) GetServiceInfo() map[string]ServiceInfo {
ret := make(map[string]ServiceInfo)
for n, srv := range s.m {
methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
for m := range srv.md {
methods = append(methods, MethodInfo{
Name: m,
IsClientStream: false,
IsServerStream: false,
})
}
for m, d := range srv.sd {
methods = append(methods, MethodInfo{
Name: m,
IsClientStream: d.ClientStreams,
IsServerStream: d.ServerStreams,
})
}
ret[n] = ServiceInfo{
Methods: methods,
Metadata: srv.mdata,
}
}
return ret
}
var (
// ErrServerStopped indicates that the operation is now illegal because of
// the server being stopped.
@ -259,7 +324,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Service returns when lis.Accept fails. lis will be closed when
// Serve returns when lis.Accept fails. lis will be closed when
// this method returns.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
@ -272,9 +337,11 @@ func (s *Server) Serve(lis net.Listener) error {
s.lis[lis] = true
s.mu.Unlock()
defer func() {
lis.Close()
s.mu.Lock()
delete(s.lis, lis)
if s.lis != nil && s.lis[lis] {
lis.Close()
delete(s.lis, lis)
}
s.mu.Unlock()
}()
for {
@ -300,7 +367,10 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
rawConn.Close()
// If serverHandShake returns ErrConnDispatched, keep rawConn open.
if err != credentials.ErrConnDispatched {
rawConn.Close()
}
return
}
@ -418,7 +488,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
func (s *Server) addConn(c io.Closer) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
if s.conns == nil || s.drain {
return false
}
s.conns[c] = true
@ -430,6 +500,7 @@ func (s *Server) removeConn(c io.Closer) {
defer s.mu.Unlock()
if s.conns != nil {
delete(s.conns, c)
s.cv.Signal()
}
}
@ -470,16 +541,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
p := &parser{r: stream}
for {
pf, req, err := p.recvMsg()
pf, req, err := p.recvMsg(s.opts.maxMsgSize)
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
}
if err == io.ErrUnexpectedEOF {
err = transport.StreamError{Code: codes.Internal, Desc: "io.ErrUnexpectedEOF"}
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
if err != nil {
switch err := err.(type) {
case *rpcError:
if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
}
case transport.ConnectionError:
// Nothing to do here.
case transport.StreamError:
@ -494,8 +569,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
switch err := err.(type) {
case transport.StreamError:
if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
case *rpcError:
if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
}
default:
@ -519,6 +594,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
return err
}
}
if len(req) > s.opts.maxMsgSize {
// TODO: Revisit the error code. Currently keep it consistent with
// java implementation.
statusCode = codes.Internal
statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
}
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return err
}
@ -529,7 +610,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
if appErr != nil {
if err, ok := appErr.(rpcError); ok {
if err, ok := appErr.(*rpcError); ok {
statusCode = err.code
statusDesc = err.desc
} else {
@ -578,13 +659,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
stream.SetSendCompress(s.opts.cp.Type())
}
ss := &serverStream{
t: t,
s: stream,
p: &parser{r: stream},
codec: s.opts.codec,
cp: s.opts.cp,
dc: s.opts.dc,
trInfo: trInfo,
t: t,
s: stream,
p: &parser{r: stream},
codec: s.opts.codec,
cp: s.opts.cp,
dc: s.opts.dc,
maxMsgSize: s.opts.maxMsgSize,
trInfo: trInfo,
}
if ss.cp != nil {
ss.cbuf = new(bytes.Buffer)
@ -614,7 +696,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
}
if appErr != nil {
if err, ok := appErr.(rpcError); ok {
if err, ok := appErr.(*rpcError); ok {
ss.statusCode = err.code
ss.statusDesc = err.desc
} else if err, ok := appErr.(transport.StreamError); ok {
@ -716,14 +798,16 @@ func (s *Server) Stop() {
s.mu.Lock()
listeners := s.lis
s.lis = nil
cs := s.conns
st := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Signal()
s.mu.Unlock()
for lis := range listeners {
lis.Close()
}
for c := range cs {
for c := range st {
c.Close()
}
@ -735,6 +819,32 @@ func (s *Server) Stop() {
s.mu.Unlock()
}
// GracefulStop stops the gRPC server gracefully. It stops the server to accept new
// connections and RPCs and blocks until all the pending RPCs are finished.
func (s *Server) GracefulStop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.drain == true || s.conns == nil {
return
}
s.drain = true
for lis := range s.lis {
lis.Close()
}
s.lis = nil
for c := range s.conns {
c.(transport.ServerTransport).Drain()
}
for len(s.conns) != 0 {
s.cv.Wait()
}
s.conns = nil
if s.events != nil {
s.events.Finish()
s.events = nil
}
}
func init() {
internal.TestingCloseConns = func(arg interface{}) {
arg.(*Server).testingCloseConns()

View File

@ -37,6 +37,7 @@ import (
"bytes"
"errors"
"io"
"math"
"sync"
"time"
@ -84,12 +85,9 @@ type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server. It must be called
// after stream.Recv() returns non-nil error (including io.EOF) for
// bi-directional streaming and server streaming or stream.CloseAndRecv()
// returns for client streaming in order to receive trailer metadata if
// present. Otherwise, it could returns an empty MD even though trailer
// is present.
// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met.
@ -100,18 +98,23 @@ type ClientStream interface {
// NewClientStream creates a new Stream for the client side. This is called
// by generated code.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
if cc.dopts.streamInt != nil {
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
}
return newClientStream(ctx, desc, cc, method, opts...)
}
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
var (
t transport.ClientTransport
err error
s *transport.Stream
put func()
)
// TODO(zhaoq): CallOption is omitted. Add support when it is needed.
gopts := BalancerGetOptions{
BlockingWait: false,
}
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
return nil, toRPCErr(err)
c := defaultCallInfo
for _, o := range opts {
if err := o.before(&c); err != nil {
return nil, toRPCErr(err)
}
}
callHdr := &transport.CallHdr{
Host: cc.authority,
@ -121,41 +124,98 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
var trInfo traceInfo
if EnableTracing {
trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
trInfo.firstLine.client = true
if deadline, ok := ctx.Deadline(); ok {
trInfo.firstLine.deadline = deadline.Sub(time.Now())
}
trInfo.tr.LazyLog(&trInfo.firstLine, false)
ctx = trace.NewContext(ctx, trInfo.tr)
defer func() {
if err != nil {
// Need to call tr.finish() if error is returned.
// Because tr will not be returned to caller.
trInfo.tr.LazyPrintf("RPC: [%v]", err)
trInfo.tr.SetError()
trInfo.tr.Finish()
}
}()
}
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
for {
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := err.(*rpcError); ok {
return nil, err
}
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return nil, Errorf(codes.Unavailable, "%v", err)
}
continue
}
// All the other errors are treated as Internal errors.
return nil, Errorf(codes.Internal, "%v", err)
}
s, err = t.NewStream(ctx, callHdr)
if err != nil {
if put != nil {
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return nil, toRPCErr(err)
}
continue
}
return nil, toRPCErr(err)
}
break
}
cs := &clientStream{
desc: desc,
put: put,
codec: cc.dopts.codec,
cp: cc.dopts.cp,
dc: cc.dopts.dc,
opts: opts,
c: c,
desc: desc,
codec: cc.dopts.codec,
cp: cc.dopts.cp,
dc: cc.dopts.dc,
put: put,
t: t,
s: s,
p: &parser{r: s},
tracing: EnableTracing,
trInfo: trInfo,
}
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
cs.cbuf = new(bytes.Buffer)
}
if cs.tracing {
cs.trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
cs.trInfo.firstLine.client = true
if deadline, ok := ctx.Deadline(); ok {
cs.trInfo.firstLine.deadline = deadline.Sub(time.Now())
}
cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false)
ctx = trace.NewContext(ctx, cs.trInfo.tr)
}
s, err := t.NewStream(ctx, callHdr)
if err != nil {
cs.finish(err)
return nil, toRPCErr(err)
}
cs.t = t
cs.s = s
cs.p = &parser{r: s}
// Listen on ctx.Done() to detect cancellation when there is no pending
// I/O operations on this stream.
// Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination
// when there is no pending I/O operations on this stream.
go func() {
select {
case <-t.Error():
// Incur transport error, simply exit.
case <-s.Done():
// TODO: The trace of the RPC is terminated here when there is no pending
// I/O, which is probably not the optimal solution.
if s.StatusCode() == codes.OK {
cs.finish(nil)
} else {
cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
}
cs.closeTransportStream(nil)
case <-s.GoAway():
cs.finish(errConnDrain)
cs.closeTransportStream(errConnDrain)
case <-s.Context().Done():
err := s.Context().Err()
cs.finish(err)
@ -167,6 +227,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream.
type clientStream struct {
opts []CallOption
c callInfo
t transport.ClientTransport
s *transport.Stream
p *parser
@ -216,7 +278,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if err != nil {
cs.finish(err)
}
if err == nil || err == io.EOF {
if err == nil {
return
}
if err == io.EOF {
// Specialize the process for server streaming. SendMesg is only called
// once when creating the stream object. io.EOF needs to be skipped when
// the rpc is early finished (before the stream object is created.).
// TODO: It is probably better to move this into the generated code.
if !cs.desc.ClientStreams && cs.desc.ServerStreams {
err = nil
}
return
}
if _, ok := err.(transport.ConnectionError); !ok {
@ -231,13 +303,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
}()
if err != nil {
return transport.StreamErrorf(codes.Internal, "grpc: %v", err)
return Errorf(codes.Internal, "grpc: %v", err)
}
return cs.t.Write(cs.s, out, &transport.Options{Last: false})
}
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
err = recv(cs.p, cs.codec, cs.s, cs.dc, m)
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
@ -256,7 +328,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
return
}
// Special handling for client streaming rpc.
err = recv(cs.p, cs.codec, cs.s, cs.dc, m)
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
cs.closeTransportStream(err)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
@ -291,7 +363,7 @@ func (cs *clientStream) CloseSend() (err error) {
}
}()
if err == nil || err == io.EOF {
return
return nil
}
if _, ok := err.(transport.ConnectionError); !ok {
cs.closeTransportStream(err)
@ -312,15 +384,18 @@ func (cs *clientStream) closeTransportStream(err error) {
}
func (cs *clientStream) finish(err error) {
if !cs.tracing {
return
}
cs.mu.Lock()
defer cs.mu.Unlock()
for _, o := range cs.opts {
o.after(&cs.c)
}
if cs.put != nil {
cs.put()
cs.put = nil
}
if !cs.tracing {
return
}
if cs.trInfo.tr != nil {
if err == nil || err == io.EOF {
cs.trInfo.tr.LazyPrintf("RPC: [OK]")
@ -354,6 +429,7 @@ type serverStream struct {
cp Compressor
dc Decompressor
cbuf *bytes.Buffer
maxMsgSize int
statusCode codes.Code
statusDesc string
trInfo *traceInfo
@ -399,10 +475,13 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
}()
if err != nil {
err = transport.StreamErrorf(codes.Internal, "grpc: %v", err)
err = Errorf(codes.Internal, "grpc: %v", err)
return err
}
return ss.t.Write(ss.s, out, &transport.Options{Last: false})
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
return nil
}
func (ss *serverStream) RecvMsg(m interface{}) (err error) {
@ -420,5 +499,14 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
return recv(ss.p, ss.codec, ss.s, ss.dc, m)
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil {
if err == io.EOF {
return err
}
if err == io.ErrUnexpectedEOF {
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
return toRPCErr(err)
}
return nil
}

View File

@ -72,6 +72,11 @@ type resetStream struct {
func (*resetStream) item() {}
type goAway struct {
}
func (*goAway) item() {}
type flushIO struct {
}

46
vendor/google.golang.org/grpc/transport/go16.go generated vendored Normal file
View File

@ -0,0 +1,46 @@
// +build go1.6,!go1.7
/*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package transport
import (
"net"
"golang.org/x/net/context"
)
// dialContext connects to the address on the named network.
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address)
}

46
vendor/google.golang.org/grpc/transport/go17.go generated vendored Normal file
View File

@ -0,0 +1,46 @@
// +build go1.7
/*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package transport
import (
"net"
"golang.org/x/net/context"
)
// dialContext connects to the address on the named network.
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, address)
}

View File

@ -83,9 +83,9 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr
}
if v := r.Header.Get("grpc-timeout"); v != "" {
to, err := timeoutDecode(v)
to, err := decodeTimeout(v)
if err != nil {
return nil, StreamErrorf(codes.Internal, "malformed time-out: %v", err)
return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err)
}
st.timeoutSet = true
st.timeout = to
@ -194,7 +194,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, statusCode codes.Code,
h := ht.rw.Header()
h.Set("Grpc-Status", fmt.Sprintf("%d", statusCode))
if statusDesc != "" {
h.Set("Grpc-Message", statusDesc)
h.Set("Grpc-Message", encodeGrpcMessage(statusDesc))
}
if md := s.Trailer(); len(md) > 0 {
for k, vv := range md {
@ -312,7 +312,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
Addr: ht.RemoteAddr(),
}
if req.TLS != nil {
pr.AuthInfo = credentials.TLSInfo{*req.TLS}
pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
}
ctx = metadata.NewContext(ctx, ht.headerMD)
ctx = peer.NewContext(ctx, pr)
@ -370,6 +370,10 @@ func (ht *serverHandlerTransport) runStream() {
}
}
func (ht *serverHandlerTransport) Drain() {
panic("Drain() is not implemented")
}
// mapRecvMsgError returns the non-nil err into the appropriate
// error value as expected by callers of *grpc.parser.recvMsg.
// In particular, in can only be:
@ -389,5 +393,5 @@ func mapRecvMsgError(err error) error {
}
}
}
return ConnectionError{Desc: err.Error()}
return connectionErrorf(true, err, err.Error())
}

View File

@ -35,6 +35,7 @@ package transport
import (
"bytes"
"fmt"
"io"
"math"
"net"
@ -71,6 +72,9 @@ type http2Client struct {
shutdownChan chan struct{}
// errorChan is closed to notify the I/O error to the caller.
errorChan chan struct{}
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
@ -97,41 +101,73 @@ type http2Client struct {
maxStreams int
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
// goAwayID records the Last-Stream-ID in the GoAway frame from the server.
goAwayID uint32
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
prevGoAwayID uint32
}
func dial(fn func(context.Context, string) (net.Conn, error), ctx context.Context, addr string) (net.Conn, error) {
if fn != nil {
return fn(ctx, addr)
}
return dialContext(ctx, "tcp", addr)
}
func isTemporary(err error) bool {
switch err {
case io.EOF:
// Connection closures may be resolved upon retry, and are thus
// treated as temporary.
return true
case context.DeadlineExceeded:
// In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
// special case is not needed. Until then, we need to keep this
// clause.
return true
}
switch err := err.(type) {
case interface {
Temporary() bool
}:
return err.Temporary()
case interface {
Timeout() bool
}:
// Timeouts may be resolved upon retry, and are thus treated as
// temporary.
return err.Timeout()
}
return false
}
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err error) {
if opts.Dialer == nil {
// Set the default Dialer.
opts.Dialer = func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("tcp", addr, timeout)
}
}
func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ ClientTransport, err error) {
scheme := "http"
startT := time.Now()
timeout := opts.Timeout
conn, connErr := opts.Dialer(addr, timeout)
if connErr != nil {
return nil, ConnectionErrorf("transport: %v", connErr)
conn, err := dial(opts.Dialer, ctx, addr)
if err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err)
}
var authInfo credentials.AuthInfo
if opts.TransportCredentials != nil {
scheme = "https"
if timeout > 0 {
timeout -= time.Since(startT)
}
conn, authInfo, connErr = opts.TransportCredentials.ClientHandshake(addr, conn, timeout)
}
if connErr != nil {
return nil, ConnectionErrorf("transport: %v", connErr)
}
defer func() {
// Any further errors will close the underlying connection
defer func(conn net.Conn) {
if err != nil {
conn.Close()
}
}()
}(conn)
var authInfo credentials.AuthInfo
if creds := opts.TransportCredentials; creds != nil {
scheme = "https"
conn, authInfo, err = creds.ClientHandshake(ctx, addr, conn)
if err != nil {
// Credentials handshake errors are typically considered permanent
// to avoid retrying on e.g. bad certificates.
temp := isTemporary(err)
return nil, connectionErrorf(temp, err, "transport: %v", err)
}
}
ua := primaryUA
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
@ -147,6 +183,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
writableChan: make(chan int, 1),
shutdownChan: make(chan struct{}),
errorChan: make(chan struct{}),
goAway: make(chan struct{}),
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
@ -168,26 +205,29 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
n, err := t.conn.Write(clientPreface)
if err != nil {
t.Close()
return nil, ConnectionErrorf("transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
if n != len(clientPreface) {
t.Close()
return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
}
if initialWindowSize != defaultWindowSize {
err = t.framer.writeSettings(true, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)})
err = t.framer.writeSettings(true, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(initialWindowSize),
})
} else {
err = t.framer.writeSettings(true)
}
if err != nil {
t.Close()
return nil, ConnectionErrorf("transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
t.Close()
return nil, ConnectionErrorf("transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
go t.controller()
@ -199,6 +239,8 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &Stream{
id: t.nextID,
done: make(chan struct{}),
goAway: make(chan struct{}),
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
buf: newRecvBuffer(),
@ -213,8 +255,9 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// Make a stream be able to cancel the pending operations by itself.
s.ctx, s.cancel = context.WithCancel(ctx)
s.dec = &recvBufferReader{
ctx: s.ctx,
recv: s.buf,
ctx: s.ctx,
goAway: s.goAway,
recv: s.buf,
}
return s
}
@ -252,12 +295,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
pos := strings.LastIndex(callHdr.Method, "/")
if pos == -1 {
return nil, StreamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method)
return nil, streamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method)
}
audience := "https://" + callHdr.Host + port + callHdr.Method[:pos]
data, err := c.GetRequestMetadata(ctx, audience)
if err != nil {
return nil, StreamErrorf(codes.InvalidArgument, "transport: %v", err)
return nil, streamErrorf(codes.InvalidArgument, "transport: %v", err)
}
for k, v := range data {
authData[k] = v
@ -268,6 +311,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Unlock()
return nil, ErrConnClosing
}
if t.state == draining {
t.mu.Unlock()
return nil, ErrStreamDrain
}
if t.state != reachable {
t.mu.Unlock()
return nil, ErrConnClosing
@ -275,7 +322,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
checkStreamsQuota := t.streamsQuota != nil
t.mu.Unlock()
if checkStreamsQuota {
sq, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire())
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
@ -284,7 +331,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.streamsQuota.add(sq - 1)
}
}
if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
// Return the quota back now because there is no stream returned to the caller.
if _, ok := err.(StreamError); ok && checkStreamsQuota {
t.streamsQuota.add(1)
@ -292,6 +339,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, err
}
t.mu.Lock()
if t.state == draining {
t.mu.Unlock()
if checkStreamsQuota {
t.streamsQuota.add(1)
}
// Need to make t writable again so that the rpc in flight can still proceed.
t.writableChan <- 0
return nil, ErrStreamDrain
}
if t.state != reachable {
t.mu.Unlock()
return nil, ErrConnClosing
@ -326,7 +382,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
}
if timeout > 0 {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: timeoutEncode(timeout)})
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
}
for k, v := range authData {
// Capital header names are illegal in HTTP/2.
@ -381,7 +437,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
if err != nil {
t.notifyError(err)
return nil, ConnectionErrorf("transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
t.writableChan <- 0
@ -400,22 +456,17 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
if t.streamsQuota != nil {
updateStreams = true
}
if t.state == draining && len(t.activeStreams) == 1 {
delete(t.activeStreams, s.id)
if t.state == draining && len(t.activeStreams) == 0 {
// The transport is draining and s is the last live stream on t.
t.mu.Unlock()
t.Close()
return
}
delete(t.activeStreams, s.id)
t.mu.Unlock()
if updateStreams {
t.streamsQuota.add(1)
}
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), the caller needs
// to call cancel on the stream to interrupt the blocking on
// other goroutines.
s.cancel()
s.mu.Lock()
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
@ -432,7 +483,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
}
s.state = streamDone
s.mu.Unlock()
if _, ok := err.(StreamError); ok {
if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
}
}
@ -442,13 +493,13 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
// accessed any more.
func (t *http2Client) Close() (err error) {
t.mu.Lock()
if t.state == reachable {
close(t.errorChan)
}
if t.state == closing {
t.mu.Unlock()
return
}
if t.state == reachable || t.state == draining {
close(t.errorChan)
}
t.state = closing
t.mu.Unlock()
close(t.shutdownChan)
@ -472,10 +523,35 @@ func (t *http2Client) Close() (err error) {
func (t *http2Client) GracefulClose() error {
t.mu.Lock()
if t.state == closing {
switch t.state {
case unreachable:
// The server may close the connection concurrently. t is not available for
// any streams. Close it now.
t.mu.Unlock()
t.Close()
return nil
case closing:
t.mu.Unlock()
return nil
}
// Notify the streams which were initiated after the server sent GOAWAY.
select {
case <-t.goAway:
n := t.prevGoAwayID
if n == 0 && t.nextID > 1 {
n = t.nextID - 2
}
m := t.goAwayID + 2
if m == 2 {
m = 1
}
for i := m; i <= n; i += 2 {
if s, ok := t.activeStreams[i]; ok {
close(s.goAway)
}
}
default:
}
if t.state == draining {
t.mu.Unlock()
return nil
@ -501,15 +577,15 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
size := http2MaxFrameLen
s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
if _, ok := err.(StreamError); ok {
if _, ok := err.(StreamError); ok || err == io.EOF {
t.sendQuotaPool.cancel()
}
return err
@ -541,8 +617,8 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
// Indicate there is a writer who is about to write a data frame.
t.framer.adjustNumWriters(1)
// Got some quota. Try to acquire writing privilege on the transport.
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
if _, ok := err.(StreamError); ok {
if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
if _, ok := err.(StreamError); ok || err == io.EOF {
// Return the connection quota back.
t.sendQuotaPool.add(len(p))
}
@ -575,7 +651,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
// invoked.
if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
t.notifyError(err)
return ConnectionErrorf("transport: %v", err)
return connectionErrorf(true, err, "transport: %v", err)
}
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
@ -590,11 +666,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
}
s.mu.Lock()
if s.state != streamDone {
if s.state == streamReadDone {
s.state = streamDone
} else {
s.state = streamWriteDone
}
s.state = streamWriteDone
}
s.mu.Unlock()
return nil
@ -627,7 +699,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
func (t *http2Client) handleData(f *http2.DataFrame) {
size := len(f.Data())
if err := t.fc.onData(uint32(size)); err != nil {
t.notifyError(ConnectionErrorf("%v", err))
t.notifyError(connectionErrorf(true, err, "%v", err))
return
}
// Select the right stream to dispatch.
@ -652,6 +724,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
s.state = streamDone
s.statusCode = codes.Internal
s.statusDesc = err.Error()
close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
@ -669,13 +742,14 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// the read direction is closed, and set the status appropriately.
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
s.mu.Lock()
if s.state == streamWriteDone {
s.state = streamDone
} else {
s.state = streamReadDone
if s.state == streamDone {
s.mu.Unlock()
return
}
s.state = streamDone
s.statusCode = codes.Internal
s.statusDesc = "server closed the stream without sending trailers"
close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@ -701,6 +775,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
s.statusCode = codes.Unknown
}
s.statusDesc = fmt.Sprintf("stream terminated by RST_STREAM with error code: %d", f.ErrCode)
close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@ -725,7 +801,32 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
}
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// TODO(zhaoq): GoAwayFrame handler to be implemented
t.mu.Lock()
if t.state == reachable || t.state == draining {
if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
t.mu.Unlock()
t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
return
}
select {
case <-t.goAway:
id := t.goAwayID
// t.goAway has been closed (i.e.,multiple GoAways).
if id < f.LastStreamID {
t.mu.Unlock()
t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
return
}
t.prevGoAwayID = id
t.goAwayID = f.LastStreamID
t.mu.Unlock()
return
default:
}
t.goAwayID = f.LastStreamID
close(t.goAway)
}
t.mu.Unlock()
}
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@ -777,11 +878,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
if len(state.mdata) > 0 {
s.trailer = state.mdata
}
s.state = streamDone
s.statusCode = state.statusCode
s.statusDesc = state.statusDesc
close(s.done)
s.state = streamDone
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@ -828,7 +929,7 @@ func (t *http2Client) reader() {
t.mu.Unlock()
if s != nil {
// use error detail to provide better err message
handleMalformedHTTP2(s, StreamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
}
continue
} else {
@ -934,13 +1035,22 @@ func (t *http2Client) Error() <-chan struct{} {
return t.errorChan
}
func (t *http2Client) GoAway() <-chan struct{} {
return t.goAway
}
func (t *http2Client) notifyError(err error) {
t.mu.Lock()
defer t.mu.Unlock()
// make sure t.errorChan is closed only once.
if t.state == draining {
t.mu.Unlock()
t.Close()
return
}
if t.state == reachable {
t.state = unreachable
close(t.errorChan)
grpclog.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
}
t.mu.Unlock()
}

View File

@ -100,18 +100,23 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
if maxStreams == 0 {
maxStreams = math.MaxUint32
} else {
settings = append(settings, http2.Setting{http2.SettingMaxConcurrentStreams, maxStreams})
settings = append(settings, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
Val: maxStreams,
})
}
if initialWindowSize != defaultWindowSize {
settings = append(settings, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)})
settings = append(settings, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(initialWindowSize)})
}
if err := framer.writeSettings(true, settings...); err != nil {
return nil, ConnectionErrorf("transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
return nil, ConnectionErrorf("transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
var buf bytes.Buffer
@ -137,7 +142,7 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
}
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) {
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
buf := newRecvBuffer()
s := &Stream{
id: frame.Header().StreamID,
@ -200,6 +205,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
return
}
if s.id%2 != 1 || s.id <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", s.id)
return true
}
t.maxStreamID = s.id
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
t.activeStreams[s.id] = s
t.mu.Unlock()
@ -207,6 +219,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.updateWindow(s, uint32(n))
}
handle(s)
return
}
// HandleStreams receives incoming streams using the given handler. This is
@ -226,6 +239,10 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
frame, err := t.framer.readFrame()
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
}
if err != nil {
grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
@ -252,20 +269,20 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
t.controlBuf.put(&resetStream{se.StreamID, se.Code})
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
}
grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
id := frame.Header().StreamID
if id%2 != 1 || id <= t.maxStreamID {
// illegal gRPC stream id.
grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id)
if t.operateHeaders(frame, handle) {
t.Close()
break
}
t.maxStreamID = id
t.operateHeaders(frame, handle)
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
@ -277,7 +294,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
break
// TODO: Handle GoAway from the client appropriately.
default:
grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
@ -359,11 +376,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
// Received the end of stream from the client.
s.mu.Lock()
if s.state != streamDone {
if s.state == streamWriteDone {
s.state = streamDone
} else {
s.state = streamReadDone
}
s.state = streamReadDone
}
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
@ -435,7 +448,7 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
}
if err != nil {
t.Close()
return ConnectionErrorf("transport: %v", err)
return connectionErrorf(true, err, "transport: %v", err)
}
}
return nil
@ -450,7 +463,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
s.headerOk = true
s.mu.Unlock()
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
t.hBuf.Reset()
@ -490,7 +503,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
headersSent = true
}
s.mu.Unlock()
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
t.hBuf.Reset()
@ -503,7 +516,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
Name: "grpc-status",
Value: strconv.Itoa(int(statusCode)),
})
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: statusDesc})
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(statusDesc)})
// Attach the trailer metadata.
for k, v := range s.trailer {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
@ -531,7 +544,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
return StreamErrorf(codes.Unknown, "the stream has been done")
return streamErrorf(codes.Unknown, "the stream has been done")
}
if !s.headerOk {
writeHeaderFrame = true
@ -539,7 +552,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
s.mu.Unlock()
if writeHeaderFrame {
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
t.hBuf.Reset()
@ -555,7 +568,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
if err := t.framer.writeHeaders(false, p); err != nil {
t.Close()
return ConnectionErrorf("transport: %v", err)
return connectionErrorf(true, err, "transport: %v", err)
}
t.writableChan <- 0
}
@ -567,13 +580,13 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
size := http2MaxFrameLen
s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
if _, ok := err.(StreamError); ok {
t.sendQuotaPool.cancel()
@ -599,7 +612,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
t.framer.adjustNumWriters(1)
// Got some quota. Try to acquire writing privilege on the
// transport.
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
if _, ok := err.(StreamError); ok {
// Return the connection quota back.
t.sendQuotaPool.add(ps)
@ -629,7 +642,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
t.Close()
return ConnectionErrorf("transport: %v", err)
return connectionErrorf(true, err, "transport: %v", err)
}
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
@ -674,6 +687,17 @@ func (t *http2Server) controller() {
}
case *resetStream:
t.framer.writeRSTStream(true, i.streamID, i.code)
case *goAway:
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
// The transport is closing.
return
}
sid := t.maxStreamID
t.state = draining
t.mu.Unlock()
t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
case *flushIO:
t.framer.flushWrite()
case *ping:
@ -719,6 +743,9 @@ func (t *http2Server) Close() (err error) {
func (t *http2Server) closeStream(s *Stream) {
t.mu.Lock()
delete(t.activeStreams, s.id)
if t.state == draining && len(t.activeStreams) == 0 {
defer t.Close()
}
t.mu.Unlock()
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
@ -741,3 +768,7 @@ func (t *http2Server) closeStream(s *Stream) {
func (t *http2Server) RemoteAddr() net.Addr {
return t.conn.RemoteAddr()
}
func (t *http2Server) Drain() {
t.controlBuf.put(&goAway{})
}

View File

@ -35,6 +35,7 @@ package transport
import (
"bufio"
"bytes"
"fmt"
"io"
"net"
@ -52,7 +53,7 @@ import (
const (
// The primary user agent
primaryUA = "grpc-go/0.11"
primaryUA = "grpc-go/1.0"
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
@ -161,7 +162,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
switch f.Name {
case "content-type":
if !validContentType(f.Value) {
d.setErr(StreamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
d.setErr(streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
return
}
case "grpc-encoding":
@ -169,18 +170,18 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
case "grpc-status":
code, err := strconv.Atoi(f.Value)
if err != nil {
d.setErr(StreamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
d.setErr(streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
return
}
d.statusCode = codes.Code(code)
case "grpc-message":
d.statusDesc = f.Value
d.statusDesc = decodeGrpcMessage(f.Value)
case "grpc-timeout":
d.timeoutSet = true
var err error
d.timeout, err = timeoutDecode(f.Value)
d.timeout, err = decodeTimeout(f.Value)
if err != nil {
d.setErr(StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
d.setErr(streamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
return
}
case ":path":
@ -251,7 +252,7 @@ func div(d, r time.Duration) int64 {
}
// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
func timeoutEncode(t time.Duration) string {
func encodeTimeout(t time.Duration) string {
if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "n"
}
@ -271,7 +272,7 @@ func timeoutEncode(t time.Duration) string {
return strconv.FormatInt(div(t, time.Hour), 10) + "H"
}
func timeoutDecode(s string) (time.Duration, error) {
func decodeTimeout(s string) (time.Duration, error) {
size := len(s)
if size < 2 {
return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
@ -288,6 +289,80 @@ func timeoutDecode(s string) (time.Duration, error) {
return d * time.Duration(t), nil
}
const (
spaceByte = ' '
tildaByte = '~'
percentByte = '%'
)
// encodeGrpcMessage is used to encode status code in header field
// "grpc-message".
// It checks to see if each individual byte in msg is an
// allowable byte, and then either percent encoding or passing it through.
// When percent encoding, the byte is converted into hexadecimal notation
// with a '%' prepended.
func encodeGrpcMessage(msg string) string {
if msg == "" {
return ""
}
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
c := msg[i]
if !(c >= spaceByte && c < tildaByte && c != percentByte) {
return encodeGrpcMessageUnchecked(msg)
}
}
return msg
}
func encodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
c := msg[i]
if c >= spaceByte && c < tildaByte && c != percentByte {
buf.WriteByte(c)
} else {
buf.WriteString(fmt.Sprintf("%%%02X", c))
}
}
return buf.String()
}
// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
func decodeGrpcMessage(msg string) string {
if msg == "" {
return ""
}
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
if msg[i] == percentByte && i+2 < lenMsg {
return decodeGrpcMessageUnchecked(msg)
}
}
return msg
}
func decodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
c := msg[i]
if c == percentByte && i+2 < lenMsg {
parsed, err := strconv.ParseInt(msg[i+1:i+3], 16, 8)
if err != nil {
buf.WriteByte(c)
} else {
buf.WriteByte(byte(parsed))
i += 2
}
} else {
buf.WriteByte(c)
}
}
return buf.String()
}
type framer struct {
numWriters int32
reader io.Reader

51
vendor/google.golang.org/grpc/transport/pre_go16.go generated vendored Normal file
View File

@ -0,0 +1,51 @@
// +build !go1.6
/*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package transport
import (
"net"
"time"
"golang.org/x/net/context"
)
// dialContext connects to the address on the named network.
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
var dialer net.Dialer
if deadline, ok := ctx.Deadline(); ok {
dialer.Timeout = deadline.Sub(time.Now())
}
return dialer.Dial(network, address)
}

View File

@ -44,7 +44,6 @@ import (
"io"
"net"
"sync"
"time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
@ -120,10 +119,11 @@ func (b *recvBuffer) get() <-chan item {
// recvBufferReader implements io.Reader interface to read the data from
// recvBuffer.
type recvBufferReader struct {
ctx context.Context
recv *recvBuffer
last *bytes.Reader // Stores the remaining data in the previous calls.
err error
ctx context.Context
goAway chan struct{}
recv *recvBuffer
last *bytes.Reader // Stores the remaining data in the previous calls.
err error
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
@ -141,6 +141,8 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
select {
case <-r.ctx.Done():
return 0, ContextErr(r.ctx.Err())
case <-r.goAway:
return 0, ErrStreamDrain
case i := <-r.recv.get():
r.recv.load()
m := i.(*recvMsg)
@ -158,7 +160,7 @@ const (
streamActive streamState = iota
streamWriteDone // EndStream sent
streamReadDone // EndStream received
streamDone // sendDone and recvDone or RSTStreamFrame is sent or received.
streamDone // the entire stream is finished.
)
// Stream represents an RPC in the transport layer.
@ -169,6 +171,10 @@ type Stream struct {
// ctx is the associated context of the stream.
ctx context.Context
cancel context.CancelFunc
// done is closed when the final status arrives.
done chan struct{}
// goAway is closed when the server sent GoAways signal before this stream was initiated.
goAway chan struct{}
// method records the associated RPC method of the stream.
method string
recvCompress string
@ -214,6 +220,18 @@ func (s *Stream) SetSendCompress(str string) {
s.sendCompress = str
}
// Done returns a chanel which is closed when it receives the final status
// from the server.
func (s *Stream) Done() <-chan struct{} {
return s.done
}
// GoAway returns a channel which is closed when the server sent GoAways signal
// before this stream was initiated.
func (s *Stream) GoAway() <-chan struct{} {
return s.goAway
}
// Header acquires the key-value pairs of header metadata once it
// is available. It blocks until i) the metadata is ready or ii) there is no
// header metadata or iii) the stream is cancelled/expired.
@ -221,6 +239,8 @@ func (s *Stream) Header() (metadata.MD, error) {
select {
case <-s.ctx.Done():
return nil, ContextErr(s.ctx.Err())
case <-s.goAway:
return nil, ErrStreamDrain
case <-s.headerChan:
return s.header.Copy(), nil
}
@ -335,19 +355,17 @@ type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
// Dialer specifies how to dial a network address.
Dialer func(string, time.Duration) (net.Conn, error)
Dialer func(context.Context, string) (net.Conn, error)
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
// Timeout specifies the timeout for dialing a ClientTransport.
Timeout time.Duration
}
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(target string, opts *ConnectOptions) (ClientTransport, error) {
return newHTTP2Client(target, opts)
func NewClientTransport(ctx context.Context, target string, opts ConnectOptions) (ClientTransport, error) {
return newHTTP2Client(ctx, target, opts)
}
// Options provides additional hints and information for message
@ -417,6 +435,11 @@ type ClientTransport interface {
// and create a new one) in error case. It should not return nil
// once the transport is initiated.
Error() <-chan struct{}
// GoAway returns a channel that is closed when ClientTranspor
// receives the draining signal from the server (e.g., GOAWAY frame in
// HTTP/2).
GoAway() <-chan struct{}
}
// ServerTransport is the common interface for all gRPC server-side transport
@ -448,20 +471,25 @@ type ServerTransport interface {
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
// Drain notifies the client this ServerTransport stops accepting new RPCs.
Drain()
}
// StreamErrorf creates an StreamError with the specified error code and description.
func StreamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
// streamErrorf creates an StreamError with the specified error code and description.
func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
return StreamError{
Code: c,
Desc: fmt.Sprintf(format, a...),
}
}
// ConnectionErrorf creates an ConnectionError with the specified error description.
func ConnectionErrorf(format string, a ...interface{}) ConnectionError {
// connectionErrorf creates an ConnectionError with the specified error description.
func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
return ConnectionError{
Desc: fmt.Sprintf(format, a...),
temp: temp,
err: e,
}
}
@ -469,14 +497,36 @@ func ConnectionErrorf(format string, a ...interface{}) ConnectionError {
// entire connection and the retry of all the active streams.
type ConnectionError struct {
Desc string
temp bool
err error
}
func (e ConnectionError) Error() string {
return fmt.Sprintf("connection error: desc = %q", e.Desc)
}
// ErrConnClosing indicates that the transport is closing.
var ErrConnClosing = ConnectionError{Desc: "transport is closing"}
// Temporary indicates if this connection error is temporary or fatal.
func (e ConnectionError) Temporary() bool {
return e.temp
}
// Origin returns the original error of this connection error.
func (e ConnectionError) Origin() error {
// Never return nil error here.
// If the original error is nil, return itself.
if e.err == nil {
return e
}
return e.err
}
var (
// ErrConnClosing indicates that the transport is closing.
ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
// ErrStreamDrain indicates that the stream is rejected by the server because
// the server stops accepting new RPCs.
ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
)
// StreamError is an error that only affects one stream within a connection.
type StreamError struct {
@ -492,21 +542,34 @@ func (e StreamError) Error() string {
func ContextErr(err error) StreamError {
switch err {
case context.DeadlineExceeded:
return StreamErrorf(codes.DeadlineExceeded, "%v", err)
return streamErrorf(codes.DeadlineExceeded, "%v", err)
case context.Canceled:
return StreamErrorf(codes.Canceled, "%v", err)
return streamErrorf(codes.Canceled, "%v", err)
}
panic(fmt.Sprintf("Unexpected error from context packet: %v", err))
}
// wait blocks until it can receive from ctx.Done, closing, or proceed.
// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise
// it return the StreamError for ctx.Err.
// If it receives from goAway, it returns 0, ErrStreamDrain.
// If it receives from closing, it returns 0, ErrConnClosing.
// If it receives from proceed, it returns the received integer, nil.
func wait(ctx context.Context, closing <-chan struct{}, proceed <-chan int) (int, error) {
func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) {
select {
case <-ctx.Done():
return 0, ContextErr(ctx.Err())
case <-done:
// User cancellation has precedence.
select {
case <-ctx.Done():
return 0, ContextErr(ctx.Err())
default:
}
return 0, io.EOF
case <-goAway:
return 0, ErrStreamDrain
case <-closing:
return 0, ErrConnClosing
case i := <-proceed: