diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 2ccff7a2..76d55954 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -25,52 +25,52 @@ }, { "ImportPath": "github.com/cloudfoundry-incubator/candiedyaml", - "Rev": "646fd8fe27c55af8231d9f2524f5dc88f9dfe02d" + "Rev": "5cef21e2e4f0fd147973b558d4db7395176bcd95" }, { "ImportPath": "github.com/coreos/etcd/auth/authpb", - "Comment": "v2.3.0-479-g32a486b", - "Rev": "32a486b4628df701f172595d3397e6402ff776df" + "Comment": "v2.3.0-522-gda1138f", + "Rev": "da1138f8de842d10ad58a0c0704849080a135527" }, { "ImportPath": "github.com/coreos/etcd/client", - "Comment": "v2.3.0-479-g32a486b", - "Rev": "32a486b4628df701f172595d3397e6402ff776df" + "Comment": "v2.3.0-522-gda1138f", + "Rev": "da1138f8de842d10ad58a0c0704849080a135527" }, { "ImportPath": "github.com/coreos/etcd/clientv3", - "Comment": "v2.3.0-479-g32a486b", - "Rev": "32a486b4628df701f172595d3397e6402ff776df" + "Comment": "v2.3.0-522-gda1138f", + "Rev": "da1138f8de842d10ad58a0c0704849080a135527" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes", - "Comment": "v2.3.0-479-g32a486b", - "Rev": "32a486b4628df701f172595d3397e6402ff776df" + "Comment": "v2.3.0-522-gda1138f", + "Rev": "da1138f8de842d10ad58a0c0704849080a135527" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/etcdserverpb", - "Comment": "v2.3.0-479-g32a486b", - "Rev": "32a486b4628df701f172595d3397e6402ff776df" + "Comment": "v2.3.0-522-gda1138f", + "Rev": "da1138f8de842d10ad58a0c0704849080a135527" }, { "ImportPath": "github.com/coreos/etcd/pkg/pathutil", - "Comment": "v2.3.0-479-g32a486b", - "Rev": "32a486b4628df701f172595d3397e6402ff776df" + "Comment": "v2.3.0-522-gda1138f", + "Rev": "da1138f8de842d10ad58a0c0704849080a135527" }, { "ImportPath": "github.com/coreos/etcd/pkg/tlsutil", - "Comment": "v2.3.0-479-g32a486b", - "Rev": "32a486b4628df701f172595d3397e6402ff776df" + "Comment": "v2.3.0-522-gda1138f", + "Rev": "da1138f8de842d10ad58a0c0704849080a135527" }, { "ImportPath": "github.com/coreos/etcd/pkg/types", - "Comment": "v2.3.0-479-g32a486b", - "Rev": "32a486b4628df701f172595d3397e6402ff776df" + "Comment": "v2.3.0-522-gda1138f", + "Rev": "da1138f8de842d10ad58a0c0704849080a135527" }, { "ImportPath": "github.com/coreos/etcd/storage/storagepb", - "Comment": "v2.3.0-479-g32a486b", - "Rev": "32a486b4628df701f172595d3397e6402ff776df" + "Comment": "v2.3.0-522-gda1138f", + "Rev": "da1138f8de842d10ad58a0c0704849080a135527" }, { "ImportPath": "github.com/dustin/go-humanize", @@ -117,7 +117,7 @@ }, { "ImportPath": "github.com/golang/protobuf/proto", - "Rev": "f0a097ddac24fb00e07d2ac17f8671423f3ea47c" + "Rev": "bf531ff1a004f24ee53329dfd5ce0b41bfdc17df" }, { "ImportPath": "github.com/gonum/floats", @@ -235,7 +235,7 @@ }, { "ImportPath": "github.com/spf13/pflag", - "Rev": "7f60f83a2c81bc3c3c0d5297f61ddfa68da9d3b7" + "Rev": "8f6a28b0916586e7f22fe931ae2fcfc380b1c0e6" }, { "ImportPath": "github.com/ugorji/go/codec", @@ -267,27 +267,27 @@ }, { "ImportPath": "golang.org/x/net/context", - "Rev": "fb93926129b8ec0056f2f458b1f519654814edf0" + "Rev": "815d315ead425c4365077d904a2331ee9e179820" }, { "ImportPath": "golang.org/x/net/context/ctxhttp", - "Rev": "fb93926129b8ec0056f2f458b1f519654814edf0" + "Rev": "815d315ead425c4365077d904a2331ee9e179820" }, { "ImportPath": "golang.org/x/net/http2", - "Rev": "fb93926129b8ec0056f2f458b1f519654814edf0" + "Rev": "815d315ead425c4365077d904a2331ee9e179820" }, { "ImportPath": "golang.org/x/net/http2/hpack", - "Rev": "fb93926129b8ec0056f2f458b1f519654814edf0" + "Rev": "815d315ead425c4365077d904a2331ee9e179820" }, { "ImportPath": "golang.org/x/net/internal/timeseries", - "Rev": "fb93926129b8ec0056f2f458b1f519654814edf0" + "Rev": "815d315ead425c4365077d904a2331ee9e179820" }, { "ImportPath": "golang.org/x/net/trace", - "Rev": "fb93926129b8ec0056f2f458b1f519654814edf0" + "Rev": "815d315ead425c4365077d904a2331ee9e179820" }, { "ImportPath": "golang.org/x/oauth2", @@ -387,43 +387,43 @@ }, { "ImportPath": "google.golang.org/grpc", - "Rev": "9ac074585f926c8506b6351bfdc396d2b19b1cb1" + "Rev": "306a1ee0fe2c012a074592da8ffe4e33b5204f2a" }, { "ImportPath": "google.golang.org/grpc/codes", - "Rev": "9ac074585f926c8506b6351bfdc396d2b19b1cb1" + "Rev": "306a1ee0fe2c012a074592da8ffe4e33b5204f2a" }, { "ImportPath": "google.golang.org/grpc/credentials", - "Rev": "9ac074585f926c8506b6351bfdc396d2b19b1cb1" + "Rev": "306a1ee0fe2c012a074592da8ffe4e33b5204f2a" }, { "ImportPath": "google.golang.org/grpc/credentials/oauth", - "Rev": "9ac074585f926c8506b6351bfdc396d2b19b1cb1" + "Rev": "306a1ee0fe2c012a074592da8ffe4e33b5204f2a" }, { "ImportPath": "google.golang.org/grpc/grpclog", - "Rev": "9ac074585f926c8506b6351bfdc396d2b19b1cb1" + "Rev": "306a1ee0fe2c012a074592da8ffe4e33b5204f2a" }, { "ImportPath": "google.golang.org/grpc/internal", - "Rev": "9ac074585f926c8506b6351bfdc396d2b19b1cb1" + "Rev": "306a1ee0fe2c012a074592da8ffe4e33b5204f2a" }, { "ImportPath": "google.golang.org/grpc/metadata", - "Rev": "9ac074585f926c8506b6351bfdc396d2b19b1cb1" + "Rev": "306a1ee0fe2c012a074592da8ffe4e33b5204f2a" }, { "ImportPath": "google.golang.org/grpc/naming", - "Rev": "9ac074585f926c8506b6351bfdc396d2b19b1cb1" + "Rev": "306a1ee0fe2c012a074592da8ffe4e33b5204f2a" }, { "ImportPath": "google.golang.org/grpc/peer", - "Rev": "9ac074585f926c8506b6351bfdc396d2b19b1cb1" + "Rev": "306a1ee0fe2c012a074592da8ffe4e33b5204f2a" }, { "ImportPath": "google.golang.org/grpc/transport", - "Rev": "9ac074585f926c8506b6351bfdc396d2b19b1cb1" + "Rev": "306a1ee0fe2c012a074592da8ffe4e33b5204f2a" }, { "ImportPath": "gopkg.in/yaml.v2", diff --git a/vendor/github.com/cloudfoundry-incubator/candiedyaml/scanner.go b/vendor/github.com/cloudfoundry-incubator/candiedyaml/scanner.go index f856a563..5c080a06 100644 --- a/vendor/github.com/cloudfoundry-incubator/candiedyaml/scanner.go +++ b/vendor/github.com/cloudfoundry-incubator/candiedyaml/scanner.go @@ -909,7 +909,7 @@ func yaml_parser_fetch_next_token(parser *yaml_parser_t) bool { b == '@' || b == '`') || (b == '-' && !is_blank(buf[pos+1])) || (parser.flow_level == 0 && - (buf[pos] == '?' || buf[pos+1] == ':') && + (buf[pos] == '?' || buf[pos] == ':') && !is_blank(buf[pos+1])) { return yaml_parser_fetch_plain_scalar(parser) } diff --git a/vendor/github.com/coreos/etcd/clientv3/auth.go b/vendor/github.com/coreos/etcd/clientv3/auth.go index e44ac0c6..9370295f 100644 --- a/vendor/github.com/coreos/etcd/clientv3/auth.go +++ b/vendor/github.com/coreos/etcd/clientv3/auth.go @@ -26,6 +26,7 @@ import ( type ( AuthEnableResponse pb.AuthEnableResponse + AuthenticateResponse pb.AuthenticateResponse AuthUserAddResponse pb.AuthUserAddResponse AuthUserDeleteResponse pb.AuthUserDeleteResponse AuthUserChangePasswordResponse pb.AuthUserChangePasswordResponse @@ -46,6 +47,9 @@ type Auth interface { // AuthEnable enables auth of an etcd cluster. AuthEnable(ctx context.Context) (*AuthEnableResponse, error) + // Authenticate does authenticate with given user name and password. + Authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) + // UserAdd adds a new user to an etcd cluster. UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) @@ -86,6 +90,11 @@ func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { return (*AuthEnableResponse)(resp), err } +func (auth *auth) Authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) { + resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}) + return (*AuthenticateResponse)(resp), err +} + func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) { resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}) return (*AuthUserAddResponse)(resp), err diff --git a/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes/error.go b/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes/error.go index 376a91dc..4c898208 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes/error.go +++ b/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes/error.go @@ -41,4 +41,5 @@ var ( ErrUserNotFound = grpc.Errorf(codes.FailedPrecondition, "etcdserver: user name not found") ErrRoleAlreadyExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role name already exists") ErrRoleNotFound = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role name not found") + ErrAuthFailed = grpc.Errorf(codes.InvalidArgument, "etcdserver: authentication failed, invalid user ID or password") ) diff --git a/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/raft_internal.pb.go b/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/raft_internal.pb.go index 0b3a5fed..49412f16 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/raft_internal.pb.go +++ b/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/raft_internal.pb.go @@ -38,7 +38,8 @@ type InternalRaftRequest struct { AuthUserGrant *AuthUserGrantRequest `protobuf:"bytes,14,opt,name=auth_user_grant" json:"auth_user_grant,omitempty"` AuthRoleAdd *AuthRoleAddRequest `protobuf:"bytes,15,opt,name=auth_role_add" json:"auth_role_add,omitempty"` AuthRoleGrant *AuthRoleGrantRequest `protobuf:"bytes,16,opt,name=auth_role_grant" json:"auth_role_grant,omitempty"` - Alarm *AlarmRequest `protobuf:"bytes,17,opt,name=alarm" json:"alarm,omitempty"` + Authenticate *AuthenticateRequest `protobuf:"bytes,17,opt,name=authenticate" json:"authenticate,omitempty"` + Alarm *AlarmRequest `protobuf:"bytes,18,opt,name=alarm" json:"alarm,omitempty"` } func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} } @@ -228,18 +229,30 @@ func (m *InternalRaftRequest) MarshalTo(data []byte) (int, error) { } i += n15 } - if m.Alarm != nil { + if m.Authenticate != nil { data[i] = 0x8a i++ data[i] = 0x1 i++ - i = encodeVarintRaftInternal(data, i, uint64(m.Alarm.Size())) - n16, err := m.Alarm.MarshalTo(data[i:]) + i = encodeVarintRaftInternal(data, i, uint64(m.Authenticate.Size())) + n16, err := m.Authenticate.MarshalTo(data[i:]) if err != nil { return 0, err } i += n16 } + if m.Alarm != nil { + data[i] = 0x92 + i++ + data[i] = 0x1 + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.Alarm.Size())) + n17, err := m.Alarm.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n17 + } return i, nil } @@ -354,6 +367,10 @@ func (m *InternalRaftRequest) Size() (n int) { l = m.AuthRoleGrant.Size() n += 2 + l + sovRaftInternal(uint64(l)) } + if m.Authenticate != nil { + l = m.Authenticate.Size() + n += 2 + l + sovRaftInternal(uint64(l)) + } if m.Alarm != nil { l = m.Alarm.Size() n += 2 + l + sovRaftInternal(uint64(l)) @@ -924,6 +941,39 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error { } iNdEx = postIndex case 17: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Authenticate", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaftInternal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRaftInternal + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Authenticate == nil { + m.Authenticate = &AuthenticateRequest{} + } + if err := m.Authenticate.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 18: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Alarm", wireType) } diff --git a/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/raft_internal.proto b/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/raft_internal.proto index 2a6c9295..2ea0c057 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/raft_internal.proto +++ b/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/raft_internal.proto @@ -32,8 +32,9 @@ message InternalRaftRequest { AuthUserGrantRequest auth_user_grant = 14; AuthRoleAddRequest auth_role_add = 15; AuthRoleGrantRequest auth_role_grant = 16; + AuthenticateRequest authenticate = 17; - AlarmRequest alarm = 17; + AlarmRequest alarm = 18; } message EmptyResponse { diff --git a/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/rpc.pb.go b/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/rpc.pb.go index 3e2935bf..3e6ab14f 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/rpc.pb.go +++ b/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/rpc.pb.go @@ -1374,6 +1374,8 @@ func (m *AuthDisableRequest) String() string { return proto.CompactTextString(m) func (*AuthDisableRequest) ProtoMessage() {} type AuthenticateRequest struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Password string `protobuf:"bytes,2,opt,name=password,proto3" json:"password,omitempty"` } func (m *AuthenticateRequest) Reset() { *m = AuthenticateRequest{} } @@ -1514,6 +1516,8 @@ func (m *AuthDisableResponse) GetHeader() *ResponseHeader { type AuthenticateResponse struct { Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` + // token is an authorized token that can be used in succeeding RPCs + Token string `protobuf:"bytes,2,opt,name=token,proto3" json:"token,omitempty"` } func (m *AuthenticateResponse) Reset() { *m = AuthenticateResponse{} } @@ -4755,6 +4759,18 @@ func (m *AuthenticateRequest) MarshalTo(data []byte) (int, error) { _ = i var l int _ = l + if len(m.Name) > 0 { + data[i] = 0xa + i++ + i = encodeVarintRpc(data, i, uint64(len(m.Name))) + i += copy(data[i:], m.Name) + } + if len(m.Password) > 0 { + data[i] = 0x12 + i++ + i = encodeVarintRpc(data, i, uint64(len(m.Password))) + i += copy(data[i:], m.Password) + } return i, nil } @@ -5101,6 +5117,12 @@ func (m *AuthenticateResponse) MarshalTo(data []byte) (int, error) { } i += n35 } + if len(m.Token) > 0 { + data[i] = 0x12 + i++ + i = encodeVarintRpc(data, i, uint64(len(m.Token))) + i += copy(data[i:], m.Token) + } return i, nil } @@ -6187,6 +6209,14 @@ func (m *AuthDisableRequest) Size() (n int) { func (m *AuthenticateRequest) Size() (n int) { var l int _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + l = len(m.Password) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } return n } @@ -6323,6 +6353,10 @@ func (m *AuthenticateResponse) Size() (n int) { l = m.Header.Size() n += 1 + l + sovRpc(uint64(l)) } + l = len(m.Token) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } return n } @@ -11393,6 +11427,64 @@ func (m *AuthenticateRequest) Unmarshal(data []byte) error { return fmt.Errorf("proto: AuthenticateRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Password", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Password = string(data[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(data[iNdEx:]) @@ -12486,6 +12578,35 @@ func (m *AuthenticateResponse) Unmarshal(data []byte) error { return err } iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Token", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Token = string(data[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(data[iNdEx:]) diff --git a/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/rpc.proto b/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/rpc.proto index 2904049b..c8fb8b03 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/rpc.proto +++ b/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/rpc.proto @@ -559,6 +559,8 @@ message AuthDisableRequest { } message AuthenticateRequest { + string name = 1; + string password = 2; } message AuthUserAddRequest { @@ -622,6 +624,8 @@ message AuthDisableResponse { message AuthenticateResponse { ResponseHeader header = 1; + // token is an authorized token that can be used in succeeding RPCs + string token = 2; } message AuthUserAddResponse { diff --git a/vendor/github.com/golang/protobuf/proto/properties.go b/vendor/github.com/golang/protobuf/proto/properties.go index 4fe2ec22..880eb22d 100644 --- a/vendor/github.com/golang/protobuf/proto/properties.go +++ b/vendor/github.com/golang/protobuf/proto/properties.go @@ -701,7 +701,11 @@ func getPropertiesLocked(t reflect.Type) *StructProperties { if f.Name == "XXX_unrecognized" { // special case prop.unrecField = toField(&f) } - oneof := f.Tag.Get("protobuf_oneof") != "" // special case + oneof := f.Tag.Get("protobuf_oneof") // special case + if oneof != "" { + // Oneof fields don't use the traditional protobuf tag. + p.OrigName = oneof + } prop.Prop[i] = p prop.order[i] = i if debug { @@ -711,7 +715,7 @@ func getPropertiesLocked(t reflect.Type) *StructProperties { } print("\n") } - if p.enc == nil && !strings.HasPrefix(f.Name, "XXX_") && !oneof { + if p.enc == nil && !strings.HasPrefix(f.Name, "XXX_") && oneof == "" { fmt.Fprintln(os.Stderr, "proto: no encoder for", f.Name, f.Type.String(), "[GetProperties]") } } diff --git a/vendor/github.com/spf13/pflag/.travis.yml b/vendor/github.com/spf13/pflag/.travis.yml index c7d8e05d..780c7ffb 100644 --- a/vendor/github.com/spf13/pflag/.travis.yml +++ b/vendor/github.com/spf13/pflag/.travis.yml @@ -3,9 +3,8 @@ sudo: false language: go go: - - 1.3 - - 1.4 - 1.5 + - 1.6 - tip install: diff --git a/vendor/github.com/spf13/pflag/flag.go b/vendor/github.com/spf13/pflag/flag.go index deac3af1..bb03cfca 100644 --- a/vendor/github.com/spf13/pflag/flag.go +++ b/vendor/github.com/spf13/pflag/flag.go @@ -242,6 +242,17 @@ func (f *FlagSet) HasFlags() bool { return len(f.formal) > 0 } +// HasAvailableFlags returns a bool to indicate if the FlagSet has any flags +// definied that are not hidden or deprecated. +func (f *FlagSet) HasAvailableFlags() bool { + for _, flag := range f.formal { + if !flag.Hidden && len(flag.Deprecated) == 0 { + return true + } + } + return false +} + // VisitAll visits the command-line flags in lexicographical order, calling // fn for each. It visits all flags, even those not set. func VisitAll(fn func(*Flag)) { diff --git a/vendor/golang.org/x/net/context/ctxhttp/cancelreq.go b/vendor/golang.org/x/net/context/ctxhttp/cancelreq.go deleted file mode 100644 index e3170e33..00000000 --- a/vendor/golang.org/x/net/context/ctxhttp/cancelreq.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2015 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// +build go1.5 - -package ctxhttp - -import "net/http" - -func canceler(client *http.Client, req *http.Request) func() { - // TODO(djd): Respect any existing value of req.Cancel. - ch := make(chan struct{}) - req.Cancel = ch - - return func() { - close(ch) - } -} diff --git a/vendor/golang.org/x/net/context/ctxhttp/cancelreq_go14.go b/vendor/golang.org/x/net/context/ctxhttp/cancelreq_go14.go deleted file mode 100644 index 56bcbadb..00000000 --- a/vendor/golang.org/x/net/context/ctxhttp/cancelreq_go14.go +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2015 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// +build !go1.5 - -package ctxhttp - -import "net/http" - -type requestCanceler interface { - CancelRequest(*http.Request) -} - -func canceler(client *http.Client, req *http.Request) func() { - rc, ok := client.Transport.(requestCanceler) - if !ok { - return func() {} - } - return func() { - rc.CancelRequest(req) - } -} diff --git a/vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go b/vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go index e35860a7..12da7775 100644 --- a/vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go +++ b/vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go @@ -30,8 +30,9 @@ func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Resp client = http.DefaultClient } - // Request cancelation changed in Go 1.5, see cancelreq.go and cancelreq_go14.go. - cancel := canceler(client, req) + // TODO(djd): Respect any existing value of req.Cancel. + cancel := make(chan struct{}) + req.Cancel = cancel type responseAndError struct { resp *http.Response @@ -55,7 +56,7 @@ func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Resp select { case <-ctx.Done(): testHookContextDoneBeforeHeaders() - cancel() + close(cancel) // Clean up after the goroutine calling client.Do: go func() { if r := <-result; r.resp != nil { @@ -76,7 +77,7 @@ func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Resp go func() { select { case <-ctx.Done(): - cancel() + close(cancel) case <-c: // The response's Body is closed. } diff --git a/vendor/google.golang.org/grpc/backoff.go b/vendor/google.golang.org/grpc/backoff.go index d0113ec9..dc4858eb 100644 --- a/vendor/google.golang.org/grpc/backoff.go +++ b/vendor/google.golang.org/grpc/backoff.go @@ -8,7 +8,7 @@ import ( // DefaultBackoffConfig uses values specified for backoff in // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md. var ( - DefaultBackoffConfig = &BackoffConfig{ + DefaultBackoffConfig = BackoffConfig{ MaxDelay: 120 * time.Second, baseDelay: 1.0 * time.Second, factor: 1.6, @@ -33,7 +33,10 @@ type BackoffConfig struct { // MaxDelay is the upper bound of backoff delay. MaxDelay time.Duration - // TODO(stevvooe): The following fields are not exported, as allowing changes + // TODO(stevvooe): The following fields are not exported, as allowing + // changes would violate the current GRPC specification for backoff. If + // GRPC decides to allow more interesting backoff strategies, these fields + // may be opened up in the future. // baseDelay is the amount of time to wait before retrying after the first // failure. @@ -46,7 +49,16 @@ type BackoffConfig struct { jitter float64 } -func (bc *BackoffConfig) backoff(retries int) (t time.Duration) { +func setDefaults(bc *BackoffConfig) { + md := bc.MaxDelay + *bc = DefaultBackoffConfig + + if md > 0 { + bc.MaxDelay = md + } +} + +func (bc BackoffConfig) backoff(retries int) (t time.Duration) { if retries == 0 { return bc.baseDelay } diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 1562c0f9..6de86e9e 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -115,9 +115,21 @@ func WithPicker(p Picker) DialOption { } } +// WithBackoffMaxDelay configures the dialer to use the provided maximum delay +// when backing off after failed connection attempts. +func WithBackoffMaxDelay(md time.Duration) DialOption { + return WithBackoffConfig(BackoffConfig{MaxDelay: md}) +} + // WithBackoffConfig configures the dialer to use the provided backoff // parameters after connection failures. -func WithBackoffConfig(b *BackoffConfig) DialOption { +// +// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up +// for use. +func WithBackoffConfig(b BackoffConfig) DialOption { + // Set defaults to ensure that provided BackoffConfig is valid and + // unexported fields get default values. + setDefaults(&b) return withBackoff(b) } diff --git a/vendor/google.golang.org/grpc/interceptor.go b/vendor/google.golang.org/grpc/interceptor.go new file mode 100644 index 00000000..588f59e5 --- /dev/null +++ b/vendor/google.golang.org/grpc/interceptor.go @@ -0,0 +1,74 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package grpc + +import ( + "golang.org/x/net/context" +) + +// UnaryServerInfo consists of various information about a unary RPC on +// server side. All per-rpc information may be mutated by the interceptor. +type UnaryServerInfo struct { + // Server is the service implementation the user provides. This is read-only. + Server interface{} + // FullMethod is the full RPC method string, i.e., /package.service/method. + FullMethod string +} + +// UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal +// execution of a unary RPC. +type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error) + +// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info +// contains all the information of this RPC the interceptor can operate on. And handler is the wrapper +// of the service method implementation. It is the responsibility of the interceptor to invoke handler +// to complete the RPC. +type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error) + +// StreamServerInfo consists of various information about a streaming RPC on +// server side. All per-rpc information may be mutated by the interceptor. +type StreamServerInfo struct { + // FullMethod is the full RPC method string, i.e., /package.service/method. + FullMethod string + // IsClientStream indicates whether the RPC is a client streaming RPC. + IsClientStream bool + // IsServerStream indicates whether the RPC is a server streaming RPC. + IsServerStream bool +} + +// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC on the server. +// info contains all the information of this RPC the interceptor can operate on. And handler is the +// service method implementation. It is the responsibility of the interceptor to invoke handler to +// complete the RPC. +type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index 8ad335eb..3192f011 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -409,10 +409,10 @@ func convertCode(err error) codes.Code { return codes.Unknown } -// SupportPackageIsVersion1 is referenced from generated protocol buffer files +// SupportPackageIsVersion2 is referenced from generated protocol buffer files // to assert that that code is compatible with this version of the grpc package. // // This constant may be renamed in the future if a change in the generated code // requires a synchronised update of grpc-go and protoc-gen-go. This constant // should not be referenced from any other code. -const SupportPackageIsVersion1 = true +const SupportPackageIsVersion2 = true diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index bdf68a0f..d3a8073d 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -57,7 +57,7 @@ import ( "google.golang.org/grpc/transport" ) -type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) +type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) // MethodDesc represents an RPC service's method specification. type MethodDesc struct { @@ -99,6 +99,8 @@ type options struct { codec Codec cp Compressor dc Decompressor + unaryInt UnaryServerInterceptor + streamInt StreamServerInterceptor maxConcurrentStreams uint32 useHandlerImpl bool // use http.Handler-based server } @@ -140,6 +142,29 @@ func Creds(c credentials.Credentials) ServerOption { } } +// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the +// server. Only one unary interceptor can be installed. The construction of multiple +// interceptors (e.g., chaining) can be implemented at the caller. +func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { + return func(o *options) { + if o.unaryInt != nil { + panic("The unary server interceptor has been set.") + } + o.unaryInt = i + } +} + +// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the +// server. Only one stream interceptor can be installed. +func StreamInterceptor(i StreamServerInterceptor) ServerOption { + return func(o *options) { + if o.streamInt != nil { + panic("The stream server interceptor has been set.") + } + o.streamInt = i + } +} + // NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { @@ -494,7 +519,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } return nil } - reply, appErr := md.Handler(srv.server, stream.Context(), df) + reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt) if appErr != nil { if err, ok := appErr.(rpcError); ok { statusCode = err.code @@ -572,7 +597,18 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.mu.Unlock() }() } - if appErr := sd.Handler(srv.server, ss); appErr != nil { + var appErr error + if s.opts.streamInt == nil { + appErr = sd.Handler(srv.server, ss) + } else { + info := &StreamServerInfo{ + FullMethod: stream.Method(), + IsClientStream: sd.ClientStreams, + IsServerStream: sd.ServerStreams, + } + appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler) + } + if appErr != nil { if err, ok := appErr.(rpcError); ok { ss.statusCode = err.code ss.statusDesc = err.desc diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index b8320788..22e49cb5 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -47,12 +47,14 @@ import ( "google.golang.org/grpc/transport" ) -type streamHandler func(srv interface{}, stream ServerStream) error +// StreamHandler defines the handler called by gRPC server to complete the +// execution of a streaming RPC. +type StreamHandler func(srv interface{}, stream ServerStream) error // StreamDesc represents a streaming RPC service's method specification. type StreamDesc struct { StreamName string - Handler streamHandler + Handler StreamHandler // At least one of these is true. ServerStreams bool diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go index b2e602e6..7e9bdf33 100644 --- a/vendor/google.golang.org/grpc/transport/control.go +++ b/vendor/google.golang.org/grpc/transport/control.go @@ -162,10 +162,6 @@ func (qb *quotaPool) acquire() <-chan int { type inFlow struct { // The inbound flow control limit for pending data. limit uint32 - // conn points to the shared connection-level inFlow that is shared - // by all streams on that conn. It is nil for the inFlow on the conn - // directly. - conn *inFlow mu sync.Mutex // pendingData is the overall data which have been received but not been @@ -176,97 +172,39 @@ type inFlow struct { pendingUpdate uint32 } -// onData is invoked when some data frame is received. It increments not only its -// own pendingData but also that of the associated connection-level flow. +// onData is invoked when some data frame is received. It updates pendingData. func (f *inFlow) onData(n uint32) error { - if n == 0 { - return nil - } f.mu.Lock() defer f.mu.Unlock() - if f.pendingData+f.pendingUpdate+n > f.limit { - return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate+n, f.limit) - } - if f.conn != nil { - if err := f.conn.onData(n); err != nil { - return ConnectionErrorf("%v", err) - } - } f.pendingData += n + if f.pendingData+f.pendingUpdate > f.limit { + return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit) + } return nil } -// adjustConnPendingUpdate increments the connection level pending updates by n. -// This is called to make the proper connection level window updates when -// receiving data frame targeting the canceled RPCs. -func (f *inFlow) adjustConnPendingUpdate(n uint32) (uint32, error) { - if n == 0 || f.conn != nil { - return 0, nil - } +// onRead is invoked when the application reads the data. It returns the window size +// to be sent to the peer. +func (f *inFlow) onRead(n uint32) uint32 { f.mu.Lock() defer f.mu.Unlock() - if f.pendingData+f.pendingUpdate+n > f.limit { - return 0, ConnectionErrorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate+n, f.limit) - } - f.pendingUpdate += n - if f.pendingUpdate >= f.limit/4 { - ret := f.pendingUpdate - f.pendingUpdate = 0 - return ret, nil - } - return 0, nil - -} - -// connOnRead updates the connection level states when the application consumes data. -func (f *inFlow) connOnRead(n uint32) uint32 { - if n == 0 || f.conn != nil { + if f.pendingData == 0 { return 0 } - f.mu.Lock() - defer f.mu.Unlock() f.pendingData -= n f.pendingUpdate += n if f.pendingUpdate >= f.limit/4 { - ret := f.pendingUpdate + wu := f.pendingUpdate f.pendingUpdate = 0 - return ret + return wu } return 0 } -// onRead is invoked when the application reads the data. It returns the window updates -// for both stream and connection level. -func (f *inFlow) onRead(n uint32) (swu, cwu uint32) { - if n == 0 { - return - } - f.mu.Lock() - defer f.mu.Unlock() - if f.pendingData == 0 { - // pendingData has been adjusted by restoreConn. - return - } - f.pendingData -= n - f.pendingUpdate += n - if f.pendingUpdate >= f.limit/4 { - swu = f.pendingUpdate - f.pendingUpdate = 0 - } - cwu = f.conn.connOnRead(n) - return -} - -// restoreConn is invoked when a stream is terminated. It removes its stake in -// the connection-level flow and resets its own state. -func (f *inFlow) restoreConn() uint32 { - if f.conn == nil { - return 0 - } +func (f *inFlow) resetPendingData() uint32 { f.mu.Lock() defer f.mu.Unlock() n := f.pendingData f.pendingData = 0 - f.pendingUpdate = 0 - return f.conn.connOnRead(n) + return n } diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go index 77c05443..63d2c5e0 100644 --- a/vendor/google.golang.org/grpc/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/transport/http2_client.go @@ -202,17 +202,13 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e } func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { - fc := &inFlow{ - limit: initialWindowSize, - conn: t.fc, - } // TODO(zhaoq): Handle uint32 overflow of Stream.id. s := &Stream{ id: t.nextID, method: callHdr.Method, sendCompress: callHdr.SendCompress, buf: newRecvBuffer(), - fc: fc, + fc: &inFlow{limit: initialWindowSize}, sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), headerChan: make(chan struct{}), } @@ -237,8 +233,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if dl, ok := ctx.Deadline(); ok { timeout = dl.Sub(time.Now()) } - if err := ctx.Err(); err != nil { - return nil, ContextErr(err) + select { + case <-ctx.Done(): + return nil, ContextErr(ctx.Err()) + default: } pr := &peer.Peer{ Addr: t.conn.RemoteAddr(), @@ -404,8 +402,10 @@ func (t *http2Client) CloseStream(s *Stream, err error) { // other goroutines. s.cancel() s.mu.Lock() - if q := s.fc.restoreConn(); q > 0 { - t.controlBuf.put(&windowUpdate{0, q}) + if q := s.fc.resetPendingData(); q > 0 { + if n := t.fc.onRead(q); n > 0 { + t.controlBuf.put(&windowUpdate{0, n}) + } } if s.state == streamDone { s.mu.Unlock() @@ -427,6 +427,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) { // accessed any more. func (t *http2Client) Close() (err error) { t.mu.Lock() + if t.state == reachable { + close(t.errorChan) + } if t.state == closing { t.mu.Unlock() return errors.New("transport: Close() was already called") @@ -505,6 +508,10 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { t.framer.adjustNumWriters(1) // Got some quota. Try to acquire writing privilege on the transport. if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { + if _, ok := err.(StreamError); ok { + // Return the connection quota back. + t.sendQuotaPool.add(len(p)) + } if t.framer.adjustNumWriters(-1) == 0 { // This writer is the last one in this batch and has the // responsibility to flush the buffered frames. It queues @@ -514,6 +521,16 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { } return err } + select { + case <-s.ctx.Done(): + t.sendQuotaPool.add(len(p)) + if t.framer.adjustNumWriters(-1) == 0 { + t.controlBuf.put(&flushIO{}) + } + t.writableChan <- 0 + return ContextErr(s.ctx.Err()) + default: + } if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 { // Do a force flush iff this is last frame for the entire gRPC message // and the caller is the only writer at this moment. @@ -560,41 +577,39 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { // Window updates will deliver to the controller for sending when // the cumulative quota exceeds the corresponding threshold. func (t *http2Client) updateWindow(s *Stream, n uint32) { - swu, cwu := s.fc.onRead(n) - if swu > 0 { - t.controlBuf.put(&windowUpdate{s.id, swu}) + if w := t.fc.onRead(n); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) } - if cwu > 0 { - t.controlBuf.put(&windowUpdate{0, cwu}) + if w := s.fc.onRead(n); w > 0 { + t.controlBuf.put(&windowUpdate{s.id, w}) } } func (t *http2Client) handleData(f *http2.DataFrame) { - // Select the right stream to dispatch. size := len(f.Data()) + if err := t.fc.onData(uint32(size)); err != nil { + t.notifyError(ConnectionErrorf("%v", err)) + return + } + // Select the right stream to dispatch. s, ok := t.getStream(f) if !ok { - cwu, err := t.fc.adjustConnPendingUpdate(uint32(size)) - if err != nil { - t.notifyError(err) - return - } - if cwu > 0 { - t.controlBuf.put(&windowUpdate{0, cwu}) + if w := t.fc.onRead(uint32(size)); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) } return } if size > 0 { + s.mu.Lock() + if s.state == streamDone { + s.mu.Unlock() + // The stream has been closed. Release the corresponding quota. + if w := t.fc.onRead(uint32(size)); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) + } + return + } if err := s.fc.onData(uint32(size)); err != nil { - if _, ok := err.(ConnectionError); ok { - t.notifyError(err) - return - } - s.mu.Lock() - if s.state == streamDone { - s.mu.Unlock() - return - } s.state = streamDone s.statusCode = codes.Internal s.statusDesc = err.Error() @@ -603,6 +618,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) { t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) return } + s.mu.Unlock() // TODO(bradfitz, zhaoq): A copy is required here because there is no // guarantee f.Data() is consumed before the arrival of next frame. // Can this copy be eliminated? diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go index 68f82033..6f233d9d 100644 --- a/vendor/google.golang.org/grpc/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/transport/http2_server.go @@ -139,15 +139,11 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI // operateHeader takes action on the decoded headers. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) { buf := newRecvBuffer() - fc := &inFlow{ - limit: initialWindowSize, - conn: t.fc, - } s := &Stream{ id: frame.Header().StreamID, st: t, buf: buf, - fc: fc, + fc: &inFlow{limit: initialWindowSize}, } var state decodeState @@ -307,42 +303,46 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { // Window updates will deliver to the controller for sending when // the cumulative quota exceeds the corresponding threshold. func (t *http2Server) updateWindow(s *Stream, n uint32) { - swu, cwu := s.fc.onRead(n) - if swu > 0 { - t.controlBuf.put(&windowUpdate{s.id, swu}) + if w := t.fc.onRead(n); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) } - if cwu > 0 { - t.controlBuf.put(&windowUpdate{0, cwu}) + if w := s.fc.onRead(n); w > 0 { + t.controlBuf.put(&windowUpdate{s.id, w}) } } func (t *http2Server) handleData(f *http2.DataFrame) { - // Select the right stream to dispatch. size := len(f.Data()) + if err := t.fc.onData(uint32(size)); err != nil { + grpclog.Printf("transport: http2Server %v", err) + t.Close() + return + } + // Select the right stream to dispatch. s, ok := t.getStream(f) if !ok { - cwu, err := t.fc.adjustConnPendingUpdate(uint32(size)) - if err != nil { - grpclog.Printf("transport: http2Server %v", err) - t.Close() - return - } - if cwu > 0 { - t.controlBuf.put(&windowUpdate{0, cwu}) + if w := t.fc.onRead(uint32(size)); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) } return } if size > 0 { - if err := s.fc.onData(uint32(size)); err != nil { - if _, ok := err.(ConnectionError); ok { - grpclog.Printf("transport: http2Server %v", err) - t.Close() - return + s.mu.Lock() + if s.state == streamDone { + s.mu.Unlock() + // The stream has been closed. Release the corresponding quota. + if w := t.fc.onRead(uint32(size)); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) } + return + } + if err := s.fc.onData(uint32(size)); err != nil { + s.mu.Unlock() t.closeStream(s) t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) return } + s.mu.Unlock() // TODO(bradfitz, zhaoq): A copy is required here because there is no // guarantee f.Data() is consumed before the arrival of next frame. // Can this copy be eliminated? @@ -516,6 +516,10 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { // TODO(zhaoq): Support multi-writers for a single stream. var writeHeaderFrame bool s.mu.Lock() + if s.state == streamDone { + s.mu.Unlock() + return StreamErrorf(codes.Unknown, "the stream has been done") + } if !s.headerOk { writeHeaderFrame = true s.headerOk = true @@ -583,6 +587,10 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { // Got some quota. Try to acquire writing privilege on the // transport. if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { + if _, ok := err.(StreamError); ok { + // Return the connection quota back. + t.sendQuotaPool.add(ps) + } if t.framer.adjustNumWriters(-1) == 0 { // This writer is the last one in this batch and has the // responsibility to flush the buffered frames. It queues @@ -592,6 +600,16 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { } return err } + select { + case <-s.ctx.Done(): + t.sendQuotaPool.add(ps) + if t.framer.adjustNumWriters(-1) == 0 { + t.controlBuf.put(&flushIO{}) + } + t.writableChan <- 0 + return ContextErr(s.ctx.Err()) + default: + } var forceFlush bool if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last { forceFlush = true @@ -689,20 +707,22 @@ func (t *http2Server) closeStream(s *Stream) { t.mu.Lock() delete(t.activeStreams, s.id) t.mu.Unlock() - if q := s.fc.restoreConn(); q > 0 { - t.controlBuf.put(&windowUpdate{0, q}) - } + // In case stream sending and receiving are invoked in separate + // goroutines (e.g., bi-directional streaming), cancel needs to be + // called to interrupt the potential blocking on other goroutines. + s.cancel() s.mu.Lock() + if q := s.fc.resetPendingData(); q > 0 { + if w := t.fc.onRead(q); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) + } + } if s.state == streamDone { s.mu.Unlock() return } s.state = streamDone s.mu.Unlock() - // In case stream sending and receiving are invoked in separate - // goroutines (e.g., bi-directional streaming), cancel needs to be - // called to interrupt the potential blocking on other goroutines. - s.cancel() } func (t *http2Server) RemoteAddr() net.Addr {