merge master

This commit is contained in:
Yuxuan Li 2017-04-03 15:20:13 -07:00
commit 13b5f120b0
18 changed files with 767 additions and 217 deletions

View File

@ -1,7 +1,6 @@
language: go
go:
- 1.5.4
- 1.6.3
- 1.7
- 1.8
@ -9,12 +8,12 @@ go:
go_import_path: google.golang.org/grpc
before_install:
- if [[ $TRAVIS_GO_VERSION != 1.5* ]]; then go get github.com/golang/lint/golint; fi
- go get github.com/golang/lint/golint
- go get -u golang.org/x/tools/cmd/goimports github.com/axw/gocov/gocov github.com/mattn/goveralls golang.org/x/tools/cmd/cover
script:
- '! gofmt -s -d -l . 2>&1 | read'
- '! goimports -l . | read'
- 'if [[ $TRAVIS_GO_VERSION != 1.5* ]]; then ! golint ./... | grep -vE "(_mock|_string|\.pb)\.go:"; fi'
- '! golint ./... | grep -vE "(_mock|_string|\.pb)\.go:"'
- '! go tool vet -all . 2>&1 | grep -vE "constant [0-9]+ not a string in call to Errorf" | grep -vF .pb.go:' # https://github.com/golang/protobuf/issues/214
- make test testrace

View File

@ -16,23 +16,7 @@ $ go get google.golang.org/grpc
Prerequisites
-------------
This requires Go 1.5 or later.
A note on the version used: significant performance improvements in benchmarks
of grpc-go have been seen by upgrading the go version from 1.5 to the latest
1.7.1.
From https://golang.org/doc/install, one way to install the latest version of go is:
```
$ GO_VERSION=1.7.1
$ OS=linux
$ ARCH=amd64
$ curl -O https://storage.googleapis.com/golang/go${GO_VERSION}.${OS}-${ARCH}.tar.gz
$ sudo tar -C /usr/local -xzf go$GO_VERSION.$OS-$ARCH.tar.gz
$ # Put go on the PATH, keep the usual installation dir
$ sudo ln -s /usr/local/go/bin/go /usr/bin/go
$ rm go$GO_VERSION.$OS-$ARCH.tar.gz
```
This requires Go 1.6 or later.
Constraints
-----------

View File

@ -38,7 +38,6 @@ import (
"fmt"
"math"
"net"
"strings"
"sync"
"time"
@ -80,7 +79,6 @@ var (
errConnClosing = errors.New("grpc: the connection is closing")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@ -330,6 +328,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
for _, opt := range opts {
opt(&cc.dopts)
}
grpcUA := "grpc-go/" + Version
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
cc.dopts.copts.UserAgent = grpcUA
}
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
@ -372,23 +378,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
cc.authority = cc.dopts.copts.Authority
} else {
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
colonPos = len(target)
cc.authority = target
}
cc.authority = target[:colonPos]
}
var ok bool
waitC := make(chan error, 1)
go func() {
var addrs []Address
defer close(waitC)
if cc.dopts.balancer == nil && cc.sc.LB != nil {
cc.dopts.balancer = cc.sc.LB
}
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
} else {
if cc.dopts.balancer != nil {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
@ -401,24 +399,22 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return
}
ch := cc.dopts.balancer.Notify()
if ch == nil {
// There is no name resolver installed.
addrs = append(addrs, Address{Addr: target})
if ch != nil {
if cc.dopts.block {
doneChan := make(chan struct{})
go cc.lbWatcher(doneChan)
<-doneChan
} else {
addrs, ok = <-ch
if !ok || len(addrs) == 0 {
waitC <- errNoAddr
go cc.lbWatcher(nil)
}
return
}
}
}
for _, a := range addrs {
if err := cc.resetAddrConn(a, false, nil); err != nil {
// No balancer, or no resolver within the balancer. Connect directly.
if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
waitC <- err
return
}
}
close(waitC)
}()
select {
case <-ctx.Done():
@ -429,15 +425,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}
if cc.dopts.scChan != nil {
go cc.scWatcher()
}
return cc, nil
}
@ -488,7 +479,10 @@ type ClientConn struct {
conns map[Address]*addrConn
}
func (cc *ClientConn) lbWatcher() {
// lbWatcher watches the Notify channel of the balancer in cc and manages
// connections accordingly. If doneChan is not nil, it is closed after the
// first successfull connection is made.
func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
for addrs := range cc.dopts.balancer.Notify() {
var (
add []Address // Addresses need to setup connections.
@ -515,7 +509,15 @@ func (cc *ClientConn) lbWatcher() {
}
cc.mu.Unlock()
for _, a := range add {
cc.resetAddrConn(a, true, nil)
if doneChan != nil {
err := cc.resetAddrConn(a, true, nil)
if err == nil {
close(doneChan)
doneChan = nil
}
} else {
cc.resetAddrConn(a, false, nil)
}
}
for _, c := range del {
c.tearDown(errConnDrain)
@ -544,7 +546,7 @@ func (cc *ClientConn) scWatcher() {
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
@ -594,8 +596,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
stale.tearDown(tearDownErr)
}
}
// skipWait may overwrite the decision in ac.dopts.block.
if ac.dopts.block && !skipWait {
if block {
if err := ac.resetTransport(false); err != nil {
if err != errConnClosing {
// Tear down ac and delete it from cc.conns.
@ -901,9 +902,9 @@ func (ac *addrConn) transportMonitor() {
// In both cases, a new ac is created.
select {
case <-t.Error():
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
default:
ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
ac.cc.resetAddrConn(ac.addr, false, errConnDrain)
}
return
case <-t.Error():
@ -912,7 +913,7 @@ func (ac *addrConn) transportMonitor() {
t.Close()
return
case <-t.GoAway():
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
return
default:
}

View File

@ -69,6 +69,18 @@ func TestTLSDialTimeout(t *testing.T) {
}
}
func TestDefaultAuthority(t *testing.T) {
target := "Non-Existent.Server:8080"
conn, err := Dial(target, WithInsecure())
if err != nil {
t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
}
conn.Close()
if conn.authority != target {
t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, target)
}
}
func TestTLSServerNameOverwrite(t *testing.T) {
overwriteServerName := "over.write.server.name"
creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", overwriteServerName)
@ -253,3 +265,44 @@ func TestDialWithBlockErrorOnNonTemporaryErrorDialer(t *testing.T) {
t.Fatalf("Dial(%q) = %v, want %v", "", err, context.DeadlineExceeded)
}
}
// emptyBalancer returns an empty set of servers.
type emptyBalancer struct {
ch chan []Address
}
func newEmptyBalancer() Balancer {
return &emptyBalancer{ch: make(chan []Address, 1)}
}
func (b *emptyBalancer) Start(_ string, _ BalancerConfig) error {
b.ch <- nil
return nil
}
func (b *emptyBalancer) Up(_ Address) func(error) {
return nil
}
func (b *emptyBalancer) Get(_ context.Context, _ BalancerGetOptions) (Address, func(), error) {
return Address{}, nil, nil
}
func (b *emptyBalancer) Notify() <-chan []Address {
return b.ch
}
func (b *emptyBalancer) Close() error {
close(b.ch)
return nil
}
func TestNonblockingDialWithEmptyBalancer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
dialDone := make(chan struct{})
go func() {
conn, err := DialContext(ctx, "Non-Existent.Server:80", WithInsecure(), WithBalancer(newEmptyBalancer()))
if err != nil {
t.Fatalf("unexpected error dialing connection: %v", err)
}
conn.Close()
close(dialDone)
}()
<-dialDone
cancel()
}

View File

@ -102,6 +102,10 @@ type TransportCredentials interface {
// authentication protocol on rawConn for clients. It returns the authenticated
// connection and the corresponding auth information about the connection.
// Implementations must use the provided context to implement timely cancellation.
// gRPC will try to reconnect if the error returned is a temporary error
// (io.EOF, context.DeadlineExceeded or err.Temporary() == true).
// If the returned error is a wrapper error, implementations should make sure that
// the error implements Temporary() to have the correct retry behaviors.
ClientHandshake(context.Context, string, net.Conn) (net.Conn, AuthInfo, error)
// ServerHandshake does the authentication handshake for servers. It returns
// the authenticated connection and the corresponding auth information about

View File

@ -39,6 +39,8 @@ package grpclb
import (
"errors"
"fmt"
"math/rand"
"net"
"sync"
"time"
@ -95,6 +97,7 @@ type addrInfo struct {
type balancer struct {
r naming.Resolver
target string
mu sync.Mutex
seq int // a sequence number to make sure addrCh does not get stale addresses.
w naming.Watcher
@ -105,6 +108,7 @@ type balancer struct {
waitCh chan struct{}
done bool
expTimer *time.Timer
rand *rand.Rand
}
func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan remoteBalancerInfo) error {
@ -176,6 +180,11 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan remoteBalancerInfo
case <-ch:
default:
}
// Pick a random one from the list, instead of always using the first one.
if l := len(b.rbs); l > 1 {
tmpIdx := b.rand.Intn(l - 1)
b.rbs[0], b.rbs[tmpIdx] = b.rbs[tmpIdx], b.rbs[0]
}
ch <- b.rbs[0]
}
}
@ -217,7 +226,7 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
for _, s := range servers {
md := metadata.Pairs("lb-token", s.LoadBalanceToken)
addr := grpc.Address{
Addr: fmt.Sprintf("%s:%d", s.IpAddress, s.Port),
Addr: fmt.Sprintf("%s:%d", net.IP(s.IpAddress), s.Port),
Metadata: &md,
}
sl = append(sl, &addrInfo{
@ -265,7 +274,9 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (ret
b.mu.Unlock()
initReq := &lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
InitialRequest: new(lbpb.InitialLoadBalanceRequest),
InitialRequest: &lbpb.InitialLoadBalanceRequest{
Name: b.target,
},
},
}
if err := stream.Send(initReq); err != nil {
@ -310,10 +321,12 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (ret
}
func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
b.rand = rand.New(rand.NewSource(time.Now().Unix()))
// TODO: Fall back to the basic direct connection if there is no name resolver.
if b.r == nil {
return errors.New("there is no name resolver installed")
}
b.target = target
b.mu.Lock()
if b.done {
b.mu.Unlock()

View File

@ -180,6 +180,14 @@ func (b *remoteBalancer) stop() {
}
func (b *remoteBalancer) BalanceLoad(stream lbpb.LoadBalancer_BalanceLoadServer) error {
req, err := stream.Recv()
if err != nil {
return err
}
initReq := req.GetInitialRequest()
if initReq.Name != besn {
return grpc.Errorf(codes.InvalidArgument, "invalid service name: %v", initReq.Name)
}
resp := &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{
InitialResponse: new(lbpb.InitialLoadBalanceResponse),
@ -264,7 +272,7 @@ func TestGRPCLB(t *testing.T) {
t.Fatalf("Failed to generate the port number %v", err)
}
be := &lbpb.Server{
IpAddress: []byte(beAddr[0]),
IpAddress: beLis.Addr().(*net.TCPAddr).IP,
Port: int32(bePort),
LoadBalanceToken: lbToken,
}
@ -332,25 +340,19 @@ func TestDropRequest(t *testing.T) {
if err != nil {
t.Fatalf("Failed to generate the port number %v", err)
}
var bes []*lbpb.Server
be := &lbpb.Server{
IpAddress: []byte(beAddr1[0]),
sls := []*lbpb.ServerList{{
Servers: []*lbpb.Server{{
IpAddress: beLis1.Addr().(*net.TCPAddr).IP,
Port: int32(bePort1),
LoadBalanceToken: lbToken,
DropRequest: true,
}
bes = append(bes, be)
be = &lbpb.Server{
IpAddress: []byte(beAddr2[0]),
}, {
IpAddress: beLis2.Addr().(*net.TCPAddr).IP,
Port: int32(bePort2),
LoadBalanceToken: lbToken,
DropRequest: false,
}
bes = append(bes, be)
sl := &lbpb.ServerList{
Servers: bes,
}
sls := []*lbpb.ServerList{sl}
}},
}}
intervals := []time.Duration{0}
ls := newRemoteBalancer(sls, intervals)
lbpb.RegisterLoadBalancerServer(lb, ls)
@ -371,19 +373,23 @@ func TestDropRequest(t *testing.T) {
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
// The 1st fail-fast RPC should fail because the 1st backend has DropRequest set to true.
helloC := hwpb.NewGreeterClient(cc)
// The 1st, non-fail-fast RPC should succeed. This ensures both server
// connections are made, because the first one has DropRequest set to true.
if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}, grpc.FailFast(false)); err != nil {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", helloC, err)
}
for i := 0; i < 3; i++ {
// Odd fail-fast RPCs should fail, because the 1st backend has DropRequest
// set to true.
if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); grpc.Code(err) != codes.Unavailable {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, %s", helloC, err, codes.Unavailable)
}
// The 2nd fail-fast RPC should succeed since it chooses the non-drop-request backend according
// to the round robin policy.
// Even fail-fast RPCs should succeed since they choose the
// non-drop-request backend according to the round robin policy.
if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); err != nil {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", helloC, err)
}
// The 3nd non-fail-fast RPC should succeed.
if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}, grpc.FailFast(false)); err != nil {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", helloC, err)
}
cc.Close()
}
@ -412,7 +418,7 @@ func TestDropRequestFailedNonFailFast(t *testing.T) {
t.Fatalf("Failed to generate the port number %v", err)
}
be := &lbpb.Server{
IpAddress: []byte(beAddr[0]),
IpAddress: beLis.Addr().(*net.TCPAddr).IP,
Port: int32(bePort),
LoadBalanceToken: lbToken,
DropRequest: true,
@ -475,7 +481,7 @@ func TestServerExpiration(t *testing.T) {
t.Fatalf("Failed to generate the port number %v", err)
}
be := &lbpb.Server{
IpAddress: []byte(beAddr[0]),
IpAddress: beLis.Addr().(*net.TCPAddr).IP,
Port: int32(bePort),
LoadBalanceToken: lbToken,
}

View File

@ -41,6 +41,8 @@ import (
// ClientParameters is used to set keepalive parameters on the client-side.
// These configure how the client will actively probe to notice when a connection broken
// and to cause activity so intermediaries are aware the connection is still in use.
// Make sure these parameters are set in coordination with the keepalive policy on the server,
// as incompatible settings can result in closing of connection.
type ClientParameters struct {
// After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive.
Time time.Duration // The current default value is infinity.
@ -48,5 +50,31 @@ type ClientParameters struct {
// the connection is closed.
Timeout time.Duration // The current default value is 20 seconds.
// If true, client runs keepalive checks even with no active RPCs.
PermitWithoutStream bool
PermitWithoutStream bool // false by default.
}
// ServerParameters is used to set keepalive and max-age parameters on the server-side.
type ServerParameters struct {
// MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
// Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
MaxConnectionIdle time.Duration // The current default value is infinity.
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway.
// A random jitter of +/-10% will be added to MaxConnectionAge to spread out connection storms.
MaxConnectionAge time.Duration // The current default value is infinity.
// MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
MaxConnectionAgeGrace time.Duration // The current default value is infinity.
// After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
Time time.Duration // The current default value is 2 hours.
// After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
// the connection is closed.
Timeout time.Duration // The current default value is 20 seconds.
}
// EnforcementPolicy is used to set keepalive enforcement policy on the server-side.
// Server will close connection with a client that violates this policy.
type EnforcementPolicy struct {
// MinTime is the minimum amount of time a client should wait before sending a keepalive ping.
MinTime time.Duration // The current default value is 5 minutes.
// If true, server expects keepalive pings even when there are no active streams(RPCs).
PermitWithoutStream bool // false by default.
}

View File

@ -525,3 +525,6 @@ type ServiceConfig struct {
// requires a synchronised update of grpc-go and protoc-gen-go. This constant
// should not be referenced from any other code.
const SupportPackageIsVersion4 = true
// Version is the current grpc version.
const Version = "1.3.0-dev"

View File

@ -53,6 +53,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
@ -109,15 +110,18 @@ type options struct {
codec Codec
cp Compressor
dc Decompressor
maxReceiveMessageSize int
maxSendMessageSize int
maxMsgSize int
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
inTapHandle tap.ServerInHandle
statsHandler stats.Handler
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
useHandlerImpl bool // use http.Handler-based server
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
}
const defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 // Use 4MB as the default receive message size limit.
@ -126,6 +130,20 @@ const defaultServerMaxSendMessageSize = 1024 * 1024 * 4 // Use 4MB as the def
// A ServerOption sets options.
type ServerOption func(*options)
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
return func(o *options) {
o.keepaliveParams = kp
}
}
// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
return func(o *options) {
o.keepalivePolicy = kep
}
}
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
@ -488,6 +506,8 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
AuthInfo: authInfo,
InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler,
KeepaliveParams: s.opts.keepaliveParams,
KeepalivePolicy: s.opts.keepalivePolicy,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {

View File

@ -93,6 +93,7 @@ var (
"Key": []string{"foo"},
}
testAppUA = "myApp1/1.0 myApp2/0.9"
failAppUA = "fail-this-RPC"
)
var raceMode bool // set by race_test.go in race mode
@ -107,10 +108,10 @@ type testServer struct {
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
if md, ok := metadata.FromContext(ctx); ok {
// For testing purpose, returns an error if there is attached metadata other than
// the user agent set by the client application.
if _, ok := md["user-agent"]; !ok {
return nil, grpc.Errorf(codes.DataLoss, "missing expected user-agent")
// For testing purpose, returns an error if user-agent is failAppUA.
// To test that client gets the correct error.
if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
return nil, grpc.Errorf(codes.DataLoss, "error for testing: "+failAppUA)
}
var str []string
for _, entry := range md["user-agent"] {
@ -215,9 +216,10 @@ func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest
if _, exists := md[":authority"]; !exists {
return grpc.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
}
// For testing purpose, returns an error if there is attached metadata except for authority.
if len(md) > 1 {
return grpc.Errorf(codes.DataLoss, "got extra metadata")
// For testing purpose, returns an error if user-agent is failAppUA.
// To test that client gets the correct error.
if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
return grpc.Errorf(codes.DataLoss, "error for testing: "+failAppUA)
}
}
cs := args.GetResponseParameters()
@ -1950,8 +1952,8 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
if err != nil || !proto.Equal(&testpb.Empty{}, reply) {
t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{})
}
if v, ok := header["ua"]; !ok || v[0] != testAppUA {
t.Fatalf("header[\"ua\"] = %q, %t, want %q, true", v, ok, testAppUA)
if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) {
t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA)
}
te.srv.Stop()
@ -1966,12 +1968,13 @@ func TestFailedEmptyUnary(t *testing.T) {
func testFailedEmptyUnary(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = failAppUA
te.startServer(&testServer{security: e.security})
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
ctx := metadata.NewContext(context.Background(), testMetadata)
wantErr := grpc.Errorf(codes.DataLoss, "missing expected user-agent")
wantErr := grpc.Errorf(codes.DataLoss, "error for testing: "+failAppUA)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !equalErrors(err, wantErr) {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
}
@ -2173,6 +2176,7 @@ func testMetadataUnaryRPC(t *testing.T, e env) {
if header != nil {
delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers
delete(header, "date") // the Date header is also optional
delete(header, "user-agent")
}
if !reflect.DeepEqual(header, testMetadata) {
t.Fatalf("Received header metadata %v, want %v", header, testMetadata)
@ -2288,6 +2292,7 @@ func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) {
if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil {
t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
}
delete(header, "user-agent")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -2331,6 +2336,7 @@ func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) {
if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil {
t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
}
delete(header, "user-agent")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -2373,6 +2379,7 @@ func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) {
if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err == nil {
t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err)
}
delete(header, "user-agent")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -2416,6 +2423,7 @@ func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) {
if err != nil {
t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
}
delete(header, "user-agent")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -2478,6 +2486,7 @@ func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) {
if err != nil {
t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
}
delete(header, "user-agent")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -2535,6 +2544,7 @@ func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
if err != nil {
t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
}
delete(header, "user-agent")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -2911,12 +2921,14 @@ func testMetadataStreamingRPC(t *testing.T, e env) {
delete(headerMD, "transport_security_type")
}
delete(headerMD, "trailer") // ignore if present
delete(headerMD, "user-agent")
if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
}
// test the cached value.
headerMD, err = stream.Header()
delete(headerMD, "trailer") // ignore if present
delete(headerMD, "user-agent")
if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
}
@ -3022,6 +3034,7 @@ func TestFailedServerStreaming(t *testing.T) {
func testFailedServerStreaming(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = failAppUA
te.startServer(&testServer{security: e.security})
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3041,7 +3054,7 @@ func testFailedServerStreaming(t *testing.T, e env) {
if err != nil {
t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
}
wantErr := grpc.Errorf(codes.DataLoss, "got extra metadata")
wantErr := grpc.Errorf(codes.DataLoss, "error for testing: "+failAppUA)
if _, err := stream.Recv(); !equalErrors(err, wantErr) {
t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr)
}

View File

@ -49,9 +49,15 @@ const (
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
infinity = time.Duration(math.MaxInt64)
defaultKeepaliveTime = infinity
defaultKeepaliveTimeout = time.Duration(20 * time.Second)
defaultClientKeepaliveTime = infinity
defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
defaultMaxStreamsClient = 100
defaultMaxConnectionIdle = infinity
defaultMaxConnectionAge = infinity
defaultMaxConnectionAgeGrace = infinity
defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
)
// The following defines various control items which could flow through
@ -79,6 +85,8 @@ type resetStream struct {
func (*resetStream) item() {}
type goAway struct {
code http2.ErrCode
debugData []byte
}
func (*goAway) item() {}

View File

@ -187,23 +187,19 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
return nil, connectionErrorf(temp, err, "transport: %v", err)
}
}
ua := primaryUA
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
}
kp := opts.KeepaliveParams
// Validate keepalive parameters.
if kp.Time == 0 {
kp.Time = defaultKeepaliveTime
kp.Time = defaultClientKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultKeepaliveTimeout
kp.Timeout = defaultClientKeepaliveTimeout
}
var buf bytes.Buffer
t := &http2Client{
ctx: ctx,
target: addr.Addr,
userAgent: ua,
userAgent: opts.UserAgent,
md: addr.Metadata,
conn: conn,
remoteAddr: conn.RemoteAddr(),
@ -897,6 +893,9 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
}
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
grpclog.Printf("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
}
t.mu.Lock()
if t.state == reachable || t.state == draining {
if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {

View File

@ -38,9 +38,12 @@ import (
"errors"
"io"
"math"
"math/rand"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/context"
"golang.org/x/net/http2"
@ -48,6 +51,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
@ -90,11 +94,33 @@ type http2Server struct {
stats stats.Handler
// Flag to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters
// Keepalive enforcement policy.
kep keepalive.EnforcementPolicy
// The time instance last ping was received.
lastPingAt time.Time
// Number of times the client has violated keepalive ping policy so far.
pingStrikes uint8
// Flag to signify that number of ping strikes should be reset to 0.
// This is set whenever data or header frames are sent.
// 1 means yes.
resetPingStrikes uint32 // Accessed atomically.
mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
// idle is the time instant when the connection went idle.
// This is either the begining of the connection or when the number of
// RPCs go down to 0.
// When the connection is busy, this value is set to 0.
idle time.Time
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@ -128,6 +154,28 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
kp := config.KeepaliveParams
if kp.MaxConnectionIdle == 0 {
kp.MaxConnectionIdle = defaultMaxConnectionIdle
}
if kp.MaxConnectionAge == 0 {
kp.MaxConnectionAge = defaultMaxConnectionAge
}
// Add a jitter to MaxConnectionAge.
kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
if kp.MaxConnectionAgeGrace == 0 {
kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
}
if kp.Time == 0 {
kp.Time = defaultServerKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultServerKeepaliveTimeout
}
kep := config.KeepalivePolicy
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
var buf bytes.Buffer
t := &http2Server{
ctx: context.Background(),
@ -149,6 +197,9 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
stats: config.StatsHandler,
kp: kp,
idle: time.Now(),
kep: kep,
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
@ -159,6 +210,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
t.stats.HandleConn(t.ctx, connBegin)
}
go t.controller()
go t.keepalive()
t.writableChan <- 0
return t, nil
}
@ -248,6 +300,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.maxStreamID = s.id
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
t.activeStreams[s.id] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
}
t.mu.Unlock()
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
@ -295,6 +350,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
t.Close()
return
}
atomic.StoreUint32(&t.activity, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
@ -305,6 +361,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
for {
frame, err := t.framer.readFrame()
atomic.StoreUint32(&t.activity, 1)
if err != nil {
if se, ok := err.(http2.StreamError); ok {
t.mu.Lock()
@ -463,6 +520,11 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
t.controlBuf.put(&settings{ack: true, ss: ss})
}
const (
maxPingStrikes = 2
defaultPingTimeout = 2 * time.Hour
)
func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() { // Do nothing.
return
@ -470,6 +532,38 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
now := time.Now()
defer func() {
t.lastPingAt = now
}()
// A reset ping strikes means that we don't need to check for policy
// violation for this ping and the pingStrikes counter should be set
// to 0.
if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
t.pingStrikes = 0
return
}
t.mu.Lock()
ns := len(t.activeStreams)
t.mu.Unlock()
if ns < 1 && !t.kep.PermitWithoutStream {
// Keepalive shouldn't be active thus, this new ping should
// have come after atleast defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
t.pingStrikes++
}
} else {
// Check if keepalive policy is respected.
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
t.pingStrikes++
}
}
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings")})
}
}
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@ -488,6 +582,13 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
first := true
endHeaders := false
var err error
defer func() {
if err == nil {
// Reset ping strikes when seding headers since that might cause the
// peer to send ping.
atomic.StoreUint32(&t.resetPingStrikes, 1)
}
}()
// Sends the headers in a single batch.
for !endHeaders {
size := t.hBuf.Len()
@ -631,7 +732,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
// TODO(zhaoq): Support multi-writers for a single stream.
var writeHeaderFrame bool
s.mu.Lock()
@ -646,6 +747,13 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
if writeHeaderFrame {
t.WriteHeader(s, nil)
}
defer func() {
if err == nil {
// Reset ping strikes when sending data since this might cause
// the peer to send ping.
atomic.StoreUint32(&t.resetPingStrikes, 1)
}
}()
r := bytes.NewBuffer(data)
for {
if r.Len() == 0 {
@ -735,6 +843,91 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
}
}
// keepalive running in a separate goroutine does the following:
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection
// after an additional duration of keepalive.Timeout.
func (t *http2Server) keepalive() {
p := &ping{}
var pingSent bool
maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
maxAge := time.NewTimer(t.kp.MaxConnectionAge)
keepalive := time.NewTimer(t.kp.Time)
// NOTE: All exit paths of this function should reset their
// respecitve timers. A failure to do so will cause the
// following clean-up to deadlock and eventually leak.
defer func() {
if !maxIdle.Stop() {
<-maxIdle.C
}
if !maxAge.Stop() {
<-maxAge.C
}
if !keepalive.Stop() {
<-keepalive.C
}
}()
for {
select {
case <-maxIdle.C:
t.mu.Lock()
idle := t.idle
if idle.IsZero() { // The connection is non-idle.
t.mu.Unlock()
maxIdle.Reset(t.kp.MaxConnectionIdle)
continue
}
val := t.kp.MaxConnectionIdle - time.Since(idle)
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.state = draining
t.mu.Unlock()
t.Drain()
// Reseting the timer so that the clean-up doesn't deadlock.
maxIdle.Reset(infinity)
return
}
t.mu.Unlock()
maxIdle.Reset(val)
case <-maxAge.C:
t.mu.Lock()
t.state = draining
t.mu.Unlock()
t.Drain()
maxAge.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-maxAge.C:
// Close the connection after grace period.
t.Close()
// Reseting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
case <-t.shutdownChan:
}
return
case <-keepalive.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
pingSent = false
keepalive.Reset(t.kp.Time)
continue
}
if pingSent {
t.Close()
// Reseting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
return
}
pingSent = true
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
case <-t.shutdownChan:
return
}
}
}
// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Server) controller() {
@ -766,7 +959,10 @@ func (t *http2Server) controller() {
sid := t.maxStreamID
t.state = draining
t.mu.Unlock()
t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
t.framer.writeGoAway(true, sid, i.code, i.debugData)
if i.code == http2.ErrCodeEnhanceYourCalm {
t.Close()
}
case *flushIO:
t.framer.flushWrite()
case *ping:
@ -816,6 +1012,9 @@ func (t *http2Server) Close() (err error) {
func (t *http2Server) closeStream(s *Stream) {
t.mu.Lock()
delete(t.activeStreams, s.id)
if len(t.activeStreams) == 0 {
t.idle = time.Now()
}
if t.state == draining && len(t.activeStreams) == 0 {
defer t.Close()
}
@ -843,5 +1042,17 @@ func (t *http2Server) RemoteAddr() net.Addr {
}
func (t *http2Server) Drain() {
t.controlBuf.put(&goAway{})
t.controlBuf.put(&goAway{code: http2.ErrCodeNo})
}
var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
func getJitter(v time.Duration) time.Duration {
if v == infinity {
return 0
}
// Generate a jitter between +/- 10% of the value.
r := int64(v / 10)
j := rgen.Int63n(2*r) - r
return time.Duration(j)
}

View File

@ -52,8 +52,6 @@ import (
)
const (
// The primary user agent
primaryUA = "grpc-go/1.0"
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
@ -188,15 +186,6 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
d.method = f.Value
default:
if !isReservedHeader(f.Name) || isWhitelistedPseudoHeader(f.Name) {
if f.Name == "user-agent" {
i := strings.LastIndex(f.Value, " ")
if i == -1 {
// There is no application user agent string being set.
return
}
// Extract the application user agent string.
f.Value = f.Value[:i]
}
if d.mdata == nil {
d.mdata = make(map[string][]string)
}

View File

@ -1,51 +0,0 @@
// +build !go1.6
/*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package transport
import (
"net"
"time"
"golang.org/x/net/context"
)
// dialContext connects to the address on the named network.
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
var dialer net.Dialer
if deadline, ok := ctx.Deadline(); ok {
dialer.Timeout = deadline.Sub(time.Now())
}
return dialer.Dial(network, address)
}

View File

@ -369,6 +369,8 @@ type ServerConfig struct {
AuthInfo credentials.AuthInfo
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
}
// NewServerTransport creates a ServerTransport with conn or non-nil error

View File

@ -156,7 +156,7 @@ func (h *testStreamHandler) handleStreamInvalidHeaderField(t *testing.T, s *Stre
}
// start starts server. Other goroutines should block on s.readyChan for further operations.
func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hType) {
var err error
if port == 0 {
s.lis, err = net.Listen("tcp", "localhost:0")
@ -180,10 +180,7 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
if err != nil {
return
}
config := &ServerConfig{
MaxStreams: maxStreams,
}
transport, err := NewServerTransport("http2", conn, config)
transport, err := NewServerTransport("http2", conn, serverConfig)
if err != nil {
return
}
@ -252,12 +249,12 @@ func (s *server) stop() {
}
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
return setUpWithOptions(t, port, maxStreams, ht, ConnectOptions{})
return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
}
func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts ConnectOptions) (*server, ClientTransport) {
func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, ClientTransport) {
server := &server{startedErr: make(chan error, 1)}
go server.start(t, port, maxStreams, ht)
go server.start(t, port, serverConfig, ht)
server.wait(t, 2*time.Second)
addr := "localhost:" + server.port
var (
@ -302,6 +299,145 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
return tr
}
// TestMaxConnectionIdle tests that a server will send GoAway to a idle client.
// An idle client is one who doesn't make any RPC calls for a duration of
// MaxConnectionIdle time.
func TestMaxConnectionIdle(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionIdle: 2 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop()
defer client.Close()
stream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Client failed to create RPC request: %v", err)
}
stream.mu.Lock()
stream.rstStream = true
stream.mu.Unlock()
client.CloseStream(stream, nil)
// wait for server to see that closed stream and max-age logic to send goaway after no new RPCs are mode
timeout := time.NewTimer(time.Second * 4)
select {
case <-client.GoAway():
if !timeout.Stop() {
<-timeout.C
}
case <-timeout.C:
t.Fatalf("Test timed out, expected a GoAway from the server.")
}
}
// TestMaxConenctionIdleNegative tests that a server will not send GoAway to a non-idle(busy) client.
func TestMaxConnectionIdleNegative(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionIdle: 2 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop()
defer client.Close()
_, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Client failed to create RPC request: %v", err)
}
timeout := time.NewTimer(time.Second * 4)
select {
case <-client.GoAway():
if !timeout.Stop() {
<-timeout.C
}
t.Fatalf("A non-idle client received a GoAway.")
case <-timeout.C:
}
}
// TestMaxConnectionAge tests that a server will send GoAway after a duration of MaxConnectionAge.
func TestMaxConnectionAge(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionAge: 2 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop()
defer client.Close()
_, err := client.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("Client failed to create stream: %v", err)
}
// Wait for max-age logic to send GoAway.
timeout := time.NewTimer(4 * time.Second)
select {
case <-client.GoAway():
if !timeout.Stop() {
<-timeout.C
}
case <-timeout.C:
t.Fatalf("Test timer out, expected a GoAway from the server.")
}
}
// TestKeepaliveServer tests that a server closes conneciton with a client that doesn't respond to keepalive pings.
func TestKeepaliveServer(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
Time: 2 * time.Second,
Timeout: 1 * time.Second,
},
}
server, c := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop()
defer c.Close()
client, err := net.Dial("tcp", server.lis.Addr().String())
if err != nil {
t.Fatalf("Failed to dial: %v", err)
}
defer client.Close()
// Set read deadline on client conn so that it doesn't block forever in errorsome cases.
client.SetReadDeadline(time.Now().Add(10 * time.Second))
// Wait for keepalive logic to close the connection.
time.Sleep(4 * time.Second)
b := make([]byte, 24)
for {
_, err = client.Read(b)
if err == nil {
continue
}
if err != io.EOF {
t.Fatalf("client.Read(_) = _,%v, want io.EOF", err)
}
break
}
}
// TestKeepaliveServerNegative tests that a server doesn't close connection with a client that responds to keepalive pings.
func TestKeepaliveServerNegative(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
Time: 2 * time.Second,
Timeout: 1 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop()
defer client.Close()
// Give keepalive logic some time by sleeping.
time.Sleep(4 * time.Second)
// Assert that client is still active.
clientTr := client.(*http2Client)
clientTr.mu.Lock()
defer clientTr.mu.Unlock()
if clientTr.state != reachable {
t.Fatalf("Test failed: Expected server-client connection to be healthy.")
}
}
func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
@ -362,7 +498,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
}
defer conn.Close()
// Create a stream.
_, err := tr.NewStream(context.Background(), &CallHdr{})
_, err := tr.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Failed to create a new stream: %v", err)
}
@ -378,7 +514,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
}
func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
s, tr := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs.
@ -396,6 +532,138 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
}
}
func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 2 * time.Second,
},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 50 * time.Millisecond,
Timeout: 50 * time.Millisecond,
PermitWithoutStream: true,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer server.stop()
defer client.Close()
timeout := time.NewTimer(2 * time.Second)
select {
case <-client.GoAway():
if !timeout.Stop() {
<-timeout.C
}
case <-timeout.C:
t.Fatalf("Test failed: Expected a GoAway from server.")
}
time.Sleep(500 * time.Millisecond)
ct := client.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state == reachable {
t.Fatalf("Test failed: Expected the connection to be closed.")
}
}
func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 2 * time.Second,
},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 50 * time.Millisecond,
Timeout: 50 * time.Millisecond,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer server.stop()
defer client.Close()
if _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}); err != nil {
t.Fatalf("Client failed to create stream.")
}
timeout := time.NewTimer(2 * time.Second)
select {
case <-client.GoAway():
if !timeout.Stop() {
<-timeout.C
}
case <-timeout.C:
t.Fatalf("Test failed: Expected a GoAway from server.")
}
time.Sleep(500 * time.Millisecond)
ct := client.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state == reachable {
t.Fatalf("Test failed: Expected the connection to be closed.")
}
}
func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 100 * time.Millisecond,
PermitWithoutStream: true,
},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 101 * time.Millisecond,
Timeout: 50 * time.Millisecond,
PermitWithoutStream: true,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer server.stop()
defer client.Close()
// Give keepalive enough time.
time.Sleep(2 * time.Second)
// Assert that connection is healthy.
ct := client.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state != reachable {
t.Fatalf("Test failed: Expected connection to be healthy.")
}
}
func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 100 * time.Millisecond,
},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 101 * time.Millisecond,
Timeout: 50 * time.Millisecond,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer server.stop()
defer client.Close()
if _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}); err != nil {
t.Fatalf("Client failed to create stream.")
}
// Give keepalive enough time.
time.Sleep(2 * time.Second)
// Assert that connection is healthy.
ct := client.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state != reachable {
t.Fatalf("Test failed: Expected connection to be healthy.")
}
}
func TestClientSendAndReceive(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, normal)
callHdr := &CallHdr{
@ -626,9 +894,9 @@ func TestMaxStreams(t *testing.T) {
t.Fatalf("Client has not received the max stream setting in 5 seconds.")
}
cc.mu.Lock()
// cc.streamsQuota should be initialized once receiving the 1st setting frame from
// the server.
if cc.streamsQuota != nil {
// cc.maxStreams should be equal to 1 after having received settings frame from
// server.
if cc.maxStreams == 1 {
cc.mu.Unlock()
select {
case <-cc.streamsQuota.acquire():