conflict resolve

This commit is contained in:
Mahak Mukhi 2017-03-06 13:19:09 -08:00
commit 661dbbc817
33 changed files with 999 additions and 197 deletions

View File

@ -4,6 +4,7 @@ go:
- 1.5.4
- 1.6.3
- 1.7
- 1.8
go_import_path: google.golang.org/grpc
@ -14,6 +15,6 @@ before_install:
script:
- '! gofmt -s -d -l . 2>&1 | read'
- '! goimports -l . | read'
- 'if [[ $TRAVIS_GO_VERSION != 1.5* ]]; then ! golint ./... | grep -vE "(_string|\.pb)\.go:"; fi'
- 'if [[ $TRAVIS_GO_VERSION != 1.5* ]]; then ! golint ./... | grep -vE "(_mock|_string|\.pb)\.go:"; fi'
- '! go tool vet -all . 2>&1 | grep -vE "constant [0-9]+ not a string in call to Errorf" | grep -vF .pb.go:' # https://github.com/golang/protobuf/issues/214
- make test testrace

View File

@ -0,0 +1,121 @@
# Mocking Service for gRPC
[Example code](https://github.com/grpc/grpc-go/tree/master/examples/helloworld/mock)
## Why?
To test client-side logic without the overhead of connecting to a real server. Mocking enables users to write light-weight unit tests to check functionalities on client-side without invoking RPC calls to a server.
## Idea: Mock the client stub that connects to the server.
We use Gomock to mock the client interface (in the generated code) and programmatically set its methods to expect and return pre-determined values. This enables users to write tests around the client logic and use this mocked stub while making RPC calls.
## How to use Gomock?
Documentation on Gomock can be found [here](https://github.com/golang/mock).
A quick reading of the documentation should enable users to follow the code below.
Consider a gRPC service based on following proto file:
```proto
//helloworld.proto
package helloworld;
message HelloRequest {
string name = 1;
}
message HelloReply {
string name = 1;
}
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
```
The generated file helloworld.pb.go will have a client interface for each service defined in the proto file. This interface will have methods corresponding to each rpc inside that service.
```Go
type GreeterClient interface {
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}
```
The generated code also contains a struct that implements this interface.
```Go
type greeterClient struct {
cc *grpc.ClientConn
}
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error){
// ...
// gRPC specific code here
// ...
}
```
Along with this the generated code has a method to create an instance of this struct.
```Go
func NewGreeterClient(cc *grpc.ClientConn) GreeterClient
```
The user code uses this function to create an instance of the struct greeterClient which then can be used to make rpc calls to the server.
We will mock this interface GreeterClient and use an instance of that mock to make rpc calls. These calls instead of going to server will return pre-determined values.
To create a mock well use [mockgen](https://github.com/golang/mock#running-mockgen).
From the directory ``` examples/helloworld/mock/ ``` run ``` mockgen google.golang.org/grpc/examples/helloworld/helloworld GreeterClient > mock_helloworld/hw_mock.go ```
Notice that in the above command we specify GreeterClient as the interface to be mocked.
The user test code can import the package generated by mockgen along with library package gomock to write unit tests around client-side logic.
```Go
import "github.com/golang/mock/gomock"
import hwmock "google.golang.org/grpc/examples/helloworld/mock/mock_helloworld"
```
An instance of the mocked interface can be created as:
```Go
mockGreeterClient := hwmock.NewMockGreeterClient(ctrl)
```
This mocked object can be programmed to expect calls to its methods and return pre-determined values. For instance, we can program mockGreeterClient to expect a call to its method SayHello and return a HelloReply with message “Mocked RPC”.
```Go
mockGreeterClient.EXPECT().SayHello(
gomock.Any(), // expect any value for first parameter
gomock.Any(), // expect any value for second parameter
).Return(&helloworld.HelloReply{Message: “Mocked RPC”}, nil)
```
gomock.Any() indicates that the parameter can have any value or type. We can indicate specific values for built-in types with gomock.Eq().
However, if the test code needs to specify the parameter to have a proto message type, we can replace gomock.Any() with an instance of a struct that implements gomock.Matcher interface.
```Go
type rpcMsg struct {
msg proto.Message
}
func (r *rpcMsg) Matches(msg interface{}) bool {
m, ok := msg.(proto.Message)
if !ok {
return false
}
return proto.Equal(m, r.msg)
}
func (r *rpcMsg) String() string {
return fmt.Sprintf("is %s", r.msg)
}
...
req := &helloworld.HelloRequest{Name: "unit_test"}
mockGreeterClient.EXPECT().SayHello(
gomock.Any(),
&rpcMsg{msg: req},
).Return(&helloworld.HelloReply{Message: "Mocked Interface"}, nil)
```

View File

@ -47,7 +47,8 @@ import (
"google.golang.org/grpc/grpclog"
)
func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
// Allows reuse of the same testpb.Payload object.
func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
if size < 0 {
grpclog.Fatalf("Requested a response with invalid length %d", size)
}
@ -59,10 +60,15 @@ func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
default:
grpclog.Fatalf("Unsupported payload type: %d", t)
}
return &testpb.Payload{
Type: t,
Body: body,
}
p.Type = t
p.Body = body
return
}
func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
p := new(testpb.Payload)
setPayload(p, t, size)
return p
}
type testServer struct {
@ -75,8 +81,13 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*
}
func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
response := &testpb.SimpleResponse{
Payload: new(testpb.Payload),
}
in := new(testpb.SimpleRequest)
for {
in, err := stream.Recv()
// use ServerStream directly to reuse the same testpb.SimpleRequest object
err := stream.(grpc.ServerStream).RecvMsg(in)
if err == io.EOF {
// read done.
return nil
@ -84,9 +95,8 @@ func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallS
if err != nil {
return err
}
if err := stream.Send(&testpb.SimpleResponse{
Payload: newPayload(in.ResponseType, int(in.ResponseSize)),
}); err != nil {
setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
if err := stream.Send(response); err != nil {
return err
}
}

View File

@ -42,6 +42,7 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@ -85,6 +86,9 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
dopts.copts.StatsHandler.HandleRPC(ctx, inPayload)
}
c.trailerMD = stream.Trailer()
if peer, ok := peer.FromContext(stream.Context()); ok {
c.peer = peer
}
return nil
}

View File

@ -271,6 +271,15 @@ func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
}
}
// WithAuthority returns a DialOption that specifies the value to be used as
// the :authority pseudo-header. This value only works with WithInsecure and
// has no effect if TransportCredentials are present.
func WithAuthority(a string) DialOption {
return func(o *dialOptions) {
o.copts.Authority = a
}
}
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
@ -329,6 +338,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
cc.authority = cc.dopts.copts.Authority
} else {
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
@ -774,6 +785,8 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
Metadata: ac.addr.Metadata,
}
newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
// Don't call cancel in success path due to a race in Go 1.6:
// https://github.com/golang/go/issues/15078.
if err != nil {
cancel()

View File

@ -85,6 +85,34 @@ func TestTLSServerNameOverwrite(t *testing.T) {
}
}
func TestWithAuthority(t *testing.T) {
overwriteServerName := "over.write.server.name"
conn, err := Dial("Non-Existent.Server:80", WithInsecure(), WithAuthority(overwriteServerName))
if err != nil {
t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
}
conn.Close()
if conn.authority != overwriteServerName {
t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, overwriteServerName)
}
}
func TestWithAuthorityAndTLS(t *testing.T) {
overwriteServerName := "over.write.server.name"
creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", overwriteServerName)
if err != nil {
t.Fatalf("Failed to create credentials %v", err)
}
conn, err := Dial("Non-Existent.Server:80", WithTransportCredentials(creds), WithAuthority("no.effect.authority"))
if err != nil {
t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
}
conn.Close()
if conn.authority != overwriteServerName {
t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, overwriteServerName)
}
}
func TestDialContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

View File

@ -1,4 +1,5 @@
// +build go1.7
// +build !go1.8
/*
*
@ -44,8 +45,6 @@ import (
// contains a mutex and must not be copied.
//
// If cfg is nil, a new zero tls.Config is returned.
//
// TODO replace this function with official clone function.
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}

View File

@ -0,0 +1,53 @@
// +build go1.8
/*
*
* Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package credentials
import (
"crypto/tls"
)
// cloneTLSConfig returns a shallow clone of the exported
// fields of cfg, ignoring the unexported sync.Once, which
// contains a mutex and must not be copied.
//
// If cfg is nil, a new zero tls.Config is returned.
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}
}
return cfg.Clone()
}

View File

@ -44,8 +44,6 @@ import (
// contains a mutex and must not be copied.
//
// If cfg is nil, a new zero tls.Config is returned.
//
// TODO replace this function with official clone function.
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}

View File

@ -0,0 +1,49 @@
package mock
import (
"fmt"
"testing"
"github.com/golang/mock/gomock"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
helloworld "google.golang.org/grpc/examples/helloworld/helloworld"
hwmock "google.golang.org/grpc/examples/helloworld/mock/mock_helloworld"
)
// rpcMsg implements the gomock.Matcher interface
type rpcMsg struct {
msg proto.Message
}
func (r *rpcMsg) Matches(msg interface{}) bool {
m, ok := msg.(proto.Message)
if !ok {
return false
}
return proto.Equal(m, r.msg)
}
func (r *rpcMsg) String() string {
return fmt.Sprintf("is %s", r.msg)
}
func TestSayHello(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockGreeterClient := hwmock.NewMockGreeterClient(ctrl)
req := &helloworld.HelloRequest{Name: "unit_test"}
mockGreeterClient.EXPECT().SayHello(
gomock.Any(),
&rpcMsg{msg: req},
).Return(&helloworld.HelloReply{Message: "Mocked Interface"}, nil)
testSayHello(t, mockGreeterClient)
}
func testSayHello(t *testing.T, client helloworld.GreeterClient) {
r, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "unit_test"})
if err != nil || r.Message != "Mocked Interface" {
t.Errorf("mocking failed")
}
t.Log("Reply : ", r.Message)
}

View File

@ -0,0 +1,48 @@
// Automatically generated by MockGen. DO NOT EDIT!
// Source: google.golang.org/grpc/examples/helloworld/helloworld (interfaces: GreeterClient)
package mock_helloworld
import (
gomock "github.com/golang/mock/gomock"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
helloworld "google.golang.org/grpc/examples/helloworld/helloworld"
)
// Mock of GreeterClient interface
type MockGreeterClient struct {
ctrl *gomock.Controller
recorder *_MockGreeterClientRecorder
}
// Recorder for MockGreeterClient (not exported)
type _MockGreeterClientRecorder struct {
mock *MockGreeterClient
}
func NewMockGreeterClient(ctrl *gomock.Controller) *MockGreeterClient {
mock := &MockGreeterClient{ctrl: ctrl}
mock.recorder = &_MockGreeterClientRecorder{mock}
return mock
}
func (_m *MockGreeterClient) EXPECT() *_MockGreeterClientRecorder {
return _m.recorder
}
func (_m *MockGreeterClient) SayHello(_param0 context.Context, _param1 *helloworld.HelloRequest, _param2 ...grpc.CallOption) (*helloworld.HelloReply, error) {
_s := []interface{}{_param0, _param1}
for _, _x := range _param2 {
_s = append(_s, _x)
}
ret := _m.ctrl.Call(_m, "SayHello", _s...)
ret0, _ := ret[0].(*helloworld.HelloReply)
ret1, _ := ret[1].(error)
return ret0, ret1
}
func (_mr *_MockGreeterClientRecorder) SayHello(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
_s := append([]interface{}{arg0, arg1}, arg2...)
return _mr.mock.ctrl.RecordCall(_mr.mock, "SayHello", _s...)
}

View File

@ -52,7 +52,7 @@ var (
serviceAccountKeyFile = flag.String("service_account_key_file", "", "Path to service account json key file")
oauthScope = flag.String("oauth_scope", "", "The scope for OAuth2 tokens")
defaultServiceAccount = flag.String("default_service_account", "", "Email of GCE default service account")
serverHost = flag.String("server_host", "127.0.0.1", "The server host name")
serverHost = flag.String("server_host", "localhost", "The server host name")
serverPort = flag.Int("server_port", 10000, "The server port number")
tlsServerName = flag.String("server_host_override", "", "The server name use to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
testCase = flag.String("test_case", "large_unary",

View File

@ -0,0 +1,174 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*
* Client used to test http2 error edge cases like GOAWAYs and RST_STREAMs
*
* Documentation:
* https://github.com/grpc/grpc/blob/master/doc/negative-http2-interop-test-descriptions.md
*/
package main
import (
"flag"
"net"
"strconv"
"sync"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/interop"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
var (
serverHost = flag.String("server_host", "127.0.0.1", "The server host name")
serverPort = flag.Int("server_port", 8080, "The server port number")
testCase = flag.String("test_case", "goaway",
`Configure different test cases. Valid options are:
goaway : client sends two requests, the server will send a goaway in between;
rst_after_header : server will send rst_stream after it sends headers;
rst_during_data : server will send rst_stream while sending data;
rst_after_data : server will send rst_stream after sending data;
ping : server will send pings between each http2 frame;
max_streams : server will ensure that the max_concurrent_streams limit is upheld;`)
largeReqSize = 271828
largeRespSize = 314159
)
func largeSimpleRequest() *testpb.SimpleRequest {
pl := interop.ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
return &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeRespSize)),
Payload: pl,
}
}
// sends two unary calls. The server asserts that the calls use different connections.
func goaway(tc testpb.TestServiceClient) {
interop.DoLargeUnaryCall(tc)
// sleep to ensure that the client has time to recv the GOAWAY.
// TODO(ncteisen): make this less hacky.
time.Sleep(1 * time.Second)
interop.DoLargeUnaryCall(tc)
}
func rstAfterHeader(tc testpb.TestServiceClient) {
req := largeSimpleRequest()
reply, err := tc.UnaryCall(context.Background(), req)
if reply != nil {
grpclog.Fatalf("Client received reply despite server sending rst stream after header")
}
if grpc.Code(err) != codes.Internal {
grpclog.Fatalf("%v.UnaryCall() = _, %v, want _, %v", tc, grpc.Code(err), codes.Internal)
}
}
func rstDuringData(tc testpb.TestServiceClient) {
req := largeSimpleRequest()
reply, err := tc.UnaryCall(context.Background(), req)
if reply != nil {
grpclog.Fatalf("Client received reply despite server sending rst stream during data")
}
if grpc.Code(err) != codes.Unknown {
grpclog.Fatalf("%v.UnaryCall() = _, %v, want _, %v", tc, grpc.Code(err), codes.Unknown)
}
}
func rstAfterData(tc testpb.TestServiceClient) {
req := largeSimpleRequest()
reply, err := tc.UnaryCall(context.Background(), req)
if reply != nil {
grpclog.Fatalf("Client received reply despite server sending rst stream after data")
}
if grpc.Code(err) != codes.Internal {
grpclog.Fatalf("%v.UnaryCall() = _, %v, want _, %v", tc, grpc.Code(err), codes.Internal)
}
}
func ping(tc testpb.TestServiceClient) {
// The server will assert that every ping it sends was ACK-ed by the client.
interop.DoLargeUnaryCall(tc)
}
func maxStreams(tc testpb.TestServiceClient) {
interop.DoLargeUnaryCall(tc)
var wg sync.WaitGroup
for i := 0; i < 15; i++ {
wg.Add(1)
go func() {
defer wg.Done()
interop.DoLargeUnaryCall(tc)
}()
}
wg.Wait()
}
func main() {
flag.Parse()
serverAddr := net.JoinHostPort(*serverHost, strconv.Itoa(*serverPort))
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
conn, err := grpc.Dial(serverAddr, opts...)
if err != nil {
grpclog.Fatalf("Fail to dial: %v", err)
}
defer conn.Close()
tc := testpb.NewTestServiceClient(conn)
switch *testCase {
case "goaway":
goaway(tc)
grpclog.Println("goaway done")
case "rst_after_header":
rstAfterHeader(tc)
grpclog.Println("rst_after_header done")
case "rst_during_data":
rstDuringData(tc)
grpclog.Println("rst_during_data done")
case "rst_after_data":
rstAfterData(tc)
grpclog.Println("rst_after_data done")
case "ping":
ping(tc)
grpclog.Println("ping done")
case "max_streams":
maxStreams(tc)
grpclog.Println("max_streams done")
default:
grpclog.Fatal("Unsupported test case: ", *testCase)
}
}

View File

@ -60,7 +60,8 @@ var (
trailingMetadataKey = "x-grpc-test-echo-trailing-bin"
)
func clientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
// ClientNewPayload returns a payload of the given type and size.
func ClientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
if size < 0 {
grpclog.Fatalf("Requested a response with invalid length %d", size)
}
@ -91,7 +92,7 @@ func DoEmptyUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
// DoLargeUnaryCall performs a unary RPC with large payload in the request and response.
func DoLargeUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeRespSize)),
@ -116,16 +117,14 @@ func DoClientStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
}
var sum int
for _, s := range reqSizes {
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, s)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, s)
req := &testpb.StreamingInputCallRequest{
Payload: pl,
}
if err := stream.Send(req); err != nil {
grpclog.Fatalf("%v.Send(%v) = %v", stream, req, err)
grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
}
sum += s
grpclog.Printf("Sent a request of size %d, aggregated size %d", s, sum)
}
reply, err := stream.CloseAndRecv()
if err != nil {
@ -173,7 +172,7 @@ func DoServerStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
respCnt++
}
if rpcStatus != io.EOF {
grpclog.Fatalf("Failed to finish the server streaming rpc: %v", err)
grpclog.Fatalf("Failed to finish the server streaming rpc: %v", rpcStatus)
}
if respCnt != len(respSizes) {
grpclog.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
@ -193,14 +192,14 @@ func DoPingPong(tc testpb.TestServiceClient, args ...grpc.CallOption) {
Size: proto.Int32(int32(respSizes[index])),
},
}
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSizes[index])
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSizes[index])
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseParameters: respParam,
Payload: pl,
}
if err := stream.Send(req); err != nil {
grpclog.Fatalf("%v.Send(%v) = %v", stream, req, err)
grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
}
reply, err := stream.Recv()
if err != nil {
@ -249,7 +248,7 @@ func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOpt
}
grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
}
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
Payload: pl,
@ -266,7 +265,7 @@ func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOpt
// DoComputeEngineCreds performs a unary RPC with compute engine auth.
func DoComputeEngineCreds(tc testpb.TestServiceClient, serviceAccount, oauthScope string) {
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeRespSize)),
@ -298,7 +297,7 @@ func getServiceAccountJSONKey(keyFile string) []byte {
// DoServiceAccountCreds performs a unary RPC with service account auth.
func DoServiceAccountCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeRespSize)),
@ -323,7 +322,7 @@ func DoServiceAccountCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, o
// DoJWTTokenCreds performs a unary RPC with JWT token auth.
func DoJWTTokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile string) {
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeRespSize)),
@ -357,7 +356,7 @@ func GetToken(serviceAccountKeyFile string, oauthScope string) *oauth2.Token {
// DoOauth2TokenCreds performs a unary RPC with OAUTH2 token auth.
func DoOauth2TokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeRespSize)),
@ -383,7 +382,7 @@ func DoOauth2TokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oaut
// DoPerRPCCreds performs a unary RPC with per RPC OAUTH2 token.
func DoPerRPCCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeRespSize)),
@ -441,14 +440,14 @@ func DoCancelAfterFirstResponse(tc testpb.TestServiceClient, args ...grpc.CallOp
Size: proto.Int32(31415),
},
}
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseParameters: respParam,
Payload: pl,
}
if err := stream.Send(req); err != nil {
grpclog.Fatalf("%v.Send(%v) = %v", stream, req, err)
grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
}
if _, err := stream.Recv(); err != nil {
grpclog.Fatalf("%v.Recv() = %v", stream, err)
@ -486,7 +485,7 @@ func validateMetadata(header, trailer metadata.MD) {
// DoCustomMetadata checks that metadata is echoed back to the client.
func DoCustomMetadata(tc testpb.TestServiceClient, args ...grpc.CallOption) {
// Testing with UnaryCall.
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(1)),
@ -526,7 +525,7 @@ func DoCustomMetadata(tc testpb.TestServiceClient, args ...grpc.CallOption) {
Payload: pl,
}
if err := stream.Send(streamReq); err != nil {
grpclog.Fatalf("%v.Send(%v) = %v", stream, streamReq, err)
grpclog.Fatalf("%v has error %v while sending %v", stream, err, streamReq)
}
streamHeader, err := stream.Header()
if err != nil {
@ -570,7 +569,7 @@ func DoStatusCodeAndMessage(tc testpb.TestServiceClient, args ...grpc.CallOption
ResponseStatus: respStatus,
}
if err := stream.Send(streamReq); err != nil {
grpclog.Fatalf("%v.Send(%v) = %v, want <nil>", stream, streamReq, err)
grpclog.Fatalf("%v has error %v while sending %v, want <nil>", stream, err, streamReq)
}
if err := stream.CloseSend(); err != nil {
grpclog.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)

View File

@ -32,6 +32,7 @@
*/
// Package metadata define the structure of the metadata supported by gRPC library.
// Please refer to http://www.grpc.io/docs/guides/wire.html for more information about custom-metadata.
package metadata // import "google.golang.org/grpc/metadata"
import (
@ -82,6 +83,7 @@ func DecodeKeyValue(k, v string) (string, string, error) {
type MD map[string][]string
// New creates a MD from given key-value map.
// Keys are automatically converted to lowercase. And for keys having "-bin" as suffix, their values will be applied Base64 encoding.
func New(m map[string]string) MD {
md := MD{}
for k, v := range m {
@ -93,6 +95,7 @@ func New(m map[string]string) MD {
// Pairs returns an MD formed by the mapping of key, value ...
// Pairs panics if len(kv) is odd.
// Keys are automatically converted to lowercase. And for keys having "-bin" as suffix, their values will be appplied Base64 encoding.
func Pairs(kv ...string) MD {
if len(kv)%2 == 1 {
panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv)))

View File

@ -2,6 +2,22 @@
// source: proto2.proto
// DO NOT EDIT!
/*
Package grpc_testing is a generated protocol buffer package.
It is generated from these files:
proto2.proto
proto2_ext.proto
proto2_ext2.proto
test.proto
It has these top-level messages:
ToBeExtended
Extension
AnotherExtension
SearchResponse
SearchRequest
*/
package grpc_testing
import proto "github.com/golang/protobuf/proto"
@ -13,26 +29,32 @@ var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type ToBeExtened struct {
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type ToBeExtended struct {
Foo *int32 `protobuf:"varint,1,req,name=foo" json:"foo,omitempty"`
proto.XXX_InternalExtensions `json:"-"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ToBeExtened) Reset() { *m = ToBeExtened{} }
func (m *ToBeExtened) String() string { return proto.CompactTextString(m) }
func (*ToBeExtened) ProtoMessage() {}
func (*ToBeExtened) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} }
func (m *ToBeExtended) Reset() { *m = ToBeExtended{} }
func (m *ToBeExtended) String() string { return proto.CompactTextString(m) }
func (*ToBeExtended) ProtoMessage() {}
func (*ToBeExtended) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
var extRange_ToBeExtened = []proto.ExtensionRange{
{10, 20},
var extRange_ToBeExtended = []proto.ExtensionRange{
{10, 30},
}
func (*ToBeExtened) ExtensionRangeArray() []proto.ExtensionRange {
return extRange_ToBeExtened
func (*ToBeExtended) ExtensionRangeArray() []proto.ExtensionRange {
return extRange_ToBeExtended
}
func (m *ToBeExtened) GetFoo() int32 {
func (m *ToBeExtended) GetFoo() int32 {
if m != nil && m.Foo != nil {
return *m.Foo
}
@ -40,17 +62,17 @@ func (m *ToBeExtened) GetFoo() int32 {
}
func init() {
proto.RegisterType((*ToBeExtened)(nil), "grpc.testing.ToBeExtened")
proto.RegisterType((*ToBeExtended)(nil), "grpc.testing.ToBeExtended")
}
func init() { proto.RegisterFile("proto2.proto", fileDescriptor1) }
func init() { proto.RegisterFile("proto2.proto", fileDescriptor0) }
var fileDescriptor1 = []byte{
// 85 bytes of a gzipped FileDescriptorProto
var fileDescriptor0 = []byte{
// 86 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x28, 0xca, 0x2f,
0xc9, 0x37, 0xd2, 0x03, 0x53, 0x42, 0x3c, 0xe9, 0x45, 0x05, 0xc9, 0x7a, 0x25, 0xa9, 0xc5, 0x25,
0x99, 0x79, 0xe9, 0x4a, 0xaa, 0x5c, 0xdc, 0x21, 0xf9, 0x4e, 0xa9, 0xae, 0x15, 0x25, 0xa9, 0x79,
0xa9, 0x29, 0x42, 0x02, 0x5c, 0xcc, 0x69, 0xf9, 0xf9, 0x12, 0x8c, 0x0a, 0x4c, 0x1a, 0xac, 0x41,
0x20, 0xa6, 0x16, 0x0b, 0x07, 0x97, 0x80, 0x28, 0x20, 0x00, 0x00, 0xff, 0xff, 0xc9, 0xed, 0xbc,
0xc2, 0x43, 0x00, 0x00, 0x00,
0x99, 0x79, 0xe9, 0x4a, 0x6a, 0x5c, 0x3c, 0x21, 0xf9, 0x4e, 0xa9, 0xae, 0x15, 0x25, 0xa9, 0x79,
0x29, 0xa9, 0x29, 0x42, 0x02, 0x5c, 0xcc, 0x69, 0xf9, 0xf9, 0x12, 0x8c, 0x0a, 0x4c, 0x1a, 0xac,
0x41, 0x20, 0xa6, 0x16, 0x0b, 0x07, 0x97, 0x80, 0x3c, 0x20, 0x00, 0x00, 0xff, 0xff, 0x74, 0x86,
0x9c, 0x08, 0x44, 0x00, 0x00, 0x00,
}

View File

@ -2,7 +2,7 @@ syntax = "proto2";
package grpc.testing;
message ToBeExtened {
message ToBeExtended {
required int32 foo = 1;
extensions 10 to 20;
extensions 10 to 30;
}

View File

@ -2,20 +2,6 @@
// source: proto2_ext.proto
// DO NOT EDIT!
/*
Package grpc_testing is a generated protocol buffer package.
It is generated from these files:
proto2_ext.proto
proto2.proto
test.proto
It has these top-level messages:
Extension
ToBeExtened
SearchResponse
SearchRequest
*/
package grpc_testing
import proto "github.com/golang/protobuf/proto"
@ -27,62 +13,71 @@ var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Extension struct {
Baz *int32 `protobuf:"varint,1,opt,name=baz" json:"baz,omitempty"`
Whatzit *int32 `protobuf:"varint,1,opt,name=whatzit" json:"whatzit,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Extension) Reset() { *m = Extension{} }
func (m *Extension) String() string { return proto.CompactTextString(m) }
func (*Extension) ProtoMessage() {}
func (*Extension) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (*Extension) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} }
func (m *Extension) GetBaz() int32 {
if m != nil && m.Baz != nil {
return *m.Baz
func (m *Extension) GetWhatzit() int32 {
if m != nil && m.Whatzit != nil {
return *m.Whatzit
}
return 0
}
var E_Bar = &proto.ExtensionDesc{
ExtendedType: (*ToBeExtened)(nil),
var E_Foo = &proto.ExtensionDesc{
ExtendedType: (*ToBeExtended)(nil),
ExtensionType: (*int32)(nil),
Field: 13,
Name: "grpc.testing.foo",
Tag: "varint,13,opt,name=foo",
Filename: "proto2_ext.proto",
}
var E_Bar = &proto.ExtensionDesc{
ExtendedType: (*ToBeExtended)(nil),
ExtensionType: (*Extension)(nil),
Field: 17,
Name: "grpc.testing.bar",
Tag: "varint,13,opt,name=bar",
Tag: "bytes,17,opt,name=bar",
Filename: "proto2_ext.proto",
}
var E_Baz = &proto.ExtensionDesc{
ExtendedType: (*ToBeExtened)(nil),
ExtensionType: (*Extension)(nil),
Field: 17,
ExtendedType: (*ToBeExtended)(nil),
ExtensionType: (*SearchRequest)(nil),
Field: 19,
Name: "grpc.testing.baz",
Tag: "bytes,17,opt,name=baz",
Tag: "bytes,19,opt,name=baz",
Filename: "proto2_ext.proto",
}
func init() {
proto.RegisterType((*Extension)(nil), "grpc.testing.Extension")
proto.RegisterExtension(E_Foo)
proto.RegisterExtension(E_Bar)
proto.RegisterExtension(E_Baz)
}
func init() { proto.RegisterFile("proto2_ext.proto", fileDescriptor0) }
func init() { proto.RegisterFile("proto2_ext.proto", fileDescriptor1) }
var fileDescriptor0 = []byte{
// 130 bytes of a gzipped FileDescriptorProto
var fileDescriptor1 = []byte{
// 179 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x12, 0x28, 0x28, 0xca, 0x2f,
0xc9, 0x37, 0x8a, 0x4f, 0xad, 0x28, 0xd1, 0x03, 0x33, 0x85, 0x78, 0xd2, 0x8b, 0x0a, 0x92, 0xf5,
0x4a, 0x52, 0x8b, 0x4b, 0x32, 0xf3, 0xd2, 0xa5, 0x78, 0x20, 0xf2, 0x10, 0x39, 0x25, 0x59, 0x2e,
0x4e, 0xd7, 0x8a, 0x92, 0xd4, 0xbc, 0xe2, 0xcc, 0xfc, 0x3c, 0x21, 0x01, 0x2e, 0xe6, 0xa4, 0xc4,
0x2a, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xd6, 0x20, 0x10, 0xd3, 0x4a, 0x1b, 0x24, 0x52, 0x24, 0x24,
0xa9, 0x87, 0x6c, 0x84, 0x5e, 0x48, 0xbe, 0x53, 0x2a, 0x58, 0x57, 0x6a, 0x8a, 0x04, 0x2f, 0x4c,
0x71, 0x91, 0x95, 0x0b, 0x58, 0x3b, 0x3e, 0xc5, 0x82, 0x40, 0xc5, 0xdc, 0x46, 0xe2, 0xa8, 0x0a,
0xe0, 0xf6, 0x83, 0xad, 0x04, 0x04, 0x00, 0x00, 0xff, 0xff, 0x59, 0xfa, 0x16, 0xbc, 0xc0, 0x00,
0x00, 0x00,
0x4a, 0x52, 0x8b, 0x4b, 0x32, 0xf3, 0xd2, 0xa5, 0x78, 0x20, 0xf2, 0x10, 0x39, 0x29, 0x2e, 0x90,
0x30, 0x84, 0xad, 0xa4, 0xca, 0xc5, 0xe9, 0x5a, 0x51, 0x92, 0x9a, 0x57, 0x9c, 0x99, 0x9f, 0x27,
0x24, 0xc1, 0xc5, 0x5e, 0x9e, 0x91, 0x58, 0x52, 0x95, 0x59, 0x22, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1,
0x1a, 0x04, 0xe3, 0x5a, 0xe9, 0x70, 0x31, 0xa7, 0xe5, 0xe7, 0x0b, 0x49, 0xe9, 0x21, 0x1b, 0xab,
0x17, 0x92, 0xef, 0x94, 0x0a, 0xd6, 0x9d, 0x92, 0x9a, 0x22, 0xc1, 0x0b, 0xd6, 0x01, 0x52, 0x66,
0xe5, 0xca, 0xc5, 0x9c, 0x94, 0x58, 0x84, 0x57, 0xb5, 0xa0, 0x02, 0xa3, 0x06, 0xb7, 0x91, 0x38,
0xaa, 0x0a, 0xb8, 0x4b, 0x82, 0x40, 0xfa, 0xad, 0x3c, 0x41, 0xc6, 0x54, 0xe1, 0x35, 0x46, 0x18,
0x6c, 0x8c, 0x34, 0xaa, 0x8a, 0xe0, 0xd4, 0xc4, 0xa2, 0xe4, 0x8c, 0xa0, 0xd4, 0xc2, 0xd2, 0xd4,
0xe2, 0x12, 0x90, 0x51, 0x55, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x71, 0x6b, 0x94, 0x9f, 0x21,
0x01, 0x00, 0x00,
}

View File

@ -3,11 +3,14 @@ syntax = "proto2";
package grpc.testing;
import "proto2.proto";
extend ToBeExtened {
optional int32 bar = 13;
optional Extension baz = 17;
import "test.proto";
extend ToBeExtended {
optional int32 foo = 13;
optional Extension bar = 17;
optional SearchRequest baz = 19;
}
message Extension {
optional int32 baz = 1;
optional int32 whatzit = 1;
}

View File

@ -0,0 +1,72 @@
// Code generated by protoc-gen-go.
// source: proto2_ext2.proto
// DO NOT EDIT!
package grpc_testing
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type AnotherExtension struct {
Whatchamacallit *int32 `protobuf:"varint,1,opt,name=whatchamacallit" json:"whatchamacallit,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *AnotherExtension) Reset() { *m = AnotherExtension{} }
func (m *AnotherExtension) String() string { return proto.CompactTextString(m) }
func (*AnotherExtension) ProtoMessage() {}
func (*AnotherExtension) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{0} }
func (m *AnotherExtension) GetWhatchamacallit() int32 {
if m != nil && m.Whatchamacallit != nil {
return *m.Whatchamacallit
}
return 0
}
var E_Frob = &proto.ExtensionDesc{
ExtendedType: (*ToBeExtended)(nil),
ExtensionType: (*string)(nil),
Field: 23,
Name: "grpc.testing.frob",
Tag: "bytes,23,opt,name=frob",
Filename: "proto2_ext2.proto",
}
var E_Nitz = &proto.ExtensionDesc{
ExtendedType: (*ToBeExtended)(nil),
ExtensionType: (*AnotherExtension)(nil),
Field: 29,
Name: "grpc.testing.nitz",
Tag: "bytes,29,opt,name=nitz",
Filename: "proto2_ext2.proto",
}
func init() {
proto.RegisterType((*AnotherExtension)(nil), "grpc.testing.AnotherExtension")
proto.RegisterExtension(E_Frob)
proto.RegisterExtension(E_Nitz)
}
func init() { proto.RegisterFile("proto2_ext2.proto", fileDescriptor2) }
var fileDescriptor2 = []byte{
// 165 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x12, 0x2c, 0x28, 0xca, 0x2f,
0xc9, 0x37, 0x8a, 0x4f, 0xad, 0x28, 0x31, 0xd2, 0x03, 0xb3, 0x85, 0x78, 0xd2, 0x8b, 0x0a, 0x92,
0xf5, 0x4a, 0x52, 0x8b, 0x4b, 0x32, 0xf3, 0xd2, 0xa5, 0x78, 0x20, 0x0a, 0x20, 0x72, 0x4a, 0x36,
0x5c, 0x02, 0x8e, 0x79, 0xf9, 0x25, 0x19, 0xa9, 0x45, 0xae, 0x15, 0x25, 0xa9, 0x79, 0xc5, 0x99,
0xf9, 0x79, 0x42, 0x1a, 0x5c, 0xfc, 0xe5, 0x19, 0x89, 0x25, 0xc9, 0x19, 0x89, 0xb9, 0x89, 0xc9,
0x89, 0x39, 0x39, 0x99, 0x25, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0xac, 0x41, 0xe8, 0xc2, 0x56, 0x7a,
0x5c, 0x2c, 0x69, 0x45, 0xf9, 0x49, 0x42, 0x52, 0x7a, 0xc8, 0x56, 0xe8, 0x85, 0xe4, 0x3b, 0xa5,
0x82, 0x8d, 0x4b, 0x49, 0x4d, 0x91, 0x10, 0x57, 0x60, 0xd4, 0xe0, 0x0c, 0x02, 0xab, 0xb3, 0xf2,
0xe3, 0x62, 0xc9, 0xcb, 0x2c, 0xa9, 0xc2, 0xab, 0x5e, 0x56, 0x81, 0x51, 0x83, 0xdb, 0x48, 0x0e,
0x55, 0x05, 0xba, 0x1b, 0x83, 0xc0, 0xe6, 0x00, 0x02, 0x00, 0x00, 0xff, 0xff, 0xf0, 0x7e, 0x0d,
0x26, 0xed, 0x00, 0x00, 0x00,
}

View File

@ -0,0 +1,14 @@
syntax = "proto2";
package grpc.testing;
import "proto2.proto";
extend ToBeExtended {
optional string frob = 23;
optional AnotherExtension nitz = 29;
}
message AnotherExtension {
optional int32 whatchamacallit = 1;
}

View File

@ -25,7 +25,7 @@ type SearchResponse struct {
func (m *SearchResponse) Reset() { *m = SearchResponse{} }
func (m *SearchResponse) String() string { return proto.CompactTextString(m) }
func (*SearchResponse) ProtoMessage() {}
func (*SearchResponse) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{0} }
func (*SearchResponse) Descriptor() ([]byte, []int) { return fileDescriptor3, []int{0} }
func (m *SearchResponse) GetResults() []*SearchResponse_Result {
if m != nil {
@ -43,7 +43,28 @@ type SearchResponse_Result struct {
func (m *SearchResponse_Result) Reset() { *m = SearchResponse_Result{} }
func (m *SearchResponse_Result) String() string { return proto.CompactTextString(m) }
func (*SearchResponse_Result) ProtoMessage() {}
func (*SearchResponse_Result) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{0, 0} }
func (*SearchResponse_Result) Descriptor() ([]byte, []int) { return fileDescriptor3, []int{0, 0} }
func (m *SearchResponse_Result) GetUrl() string {
if m != nil {
return m.Url
}
return ""
}
func (m *SearchResponse_Result) GetTitle() string {
if m != nil {
return m.Title
}
return ""
}
func (m *SearchResponse_Result) GetSnippets() []string {
if m != nil {
return m.Snippets
}
return nil
}
type SearchRequest struct {
Query string `protobuf:"bytes,1,opt,name=query" json:"query,omitempty"`
@ -52,7 +73,14 @@ type SearchRequest struct {
func (m *SearchRequest) Reset() { *m = SearchRequest{} }
func (m *SearchRequest) String() string { return proto.CompactTextString(m) }
func (*SearchRequest) ProtoMessage() {}
func (*SearchRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{1} }
func (*SearchRequest) Descriptor() ([]byte, []int) { return fileDescriptor3, []int{1} }
func (m *SearchRequest) GetQuery() string {
if m != nil {
return m.Query
}
return ""
}
func init() {
proto.RegisterType((*SearchResponse)(nil), "grpc.testing.SearchResponse")
@ -198,23 +226,23 @@ var _SearchService_serviceDesc = grpc.ServiceDesc{
Metadata: "test.proto",
}
func init() { proto.RegisterFile("test.proto", fileDescriptor2) }
func init() { proto.RegisterFile("test.proto", fileDescriptor3) }
var fileDescriptor2 = []byte{
// 227 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e,
0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0x2a, 0x48, 0xd6, 0x03, 0x09, 0x64,
0xe6, 0xa5, 0x2b, 0xcd, 0x65, 0xe4, 0xe2, 0x0b, 0x4e, 0x4d, 0x2c, 0x4a, 0xce, 0x08, 0x4a, 0x2d,
0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x15, 0xb2, 0xe5, 0x62, 0x2f, 0x4a, 0x2d, 0x2e, 0xcd, 0x29, 0x29,
0x96, 0x60, 0x54, 0x60, 0xd6, 0xe0, 0x36, 0x52, 0xd6, 0x43, 0xd6, 0xa2, 0x87, 0xaa, 0x5c, 0x2f,
0x08, 0xac, 0x36, 0x08, 0xa6, 0x47, 0xca, 0x87, 0x8b, 0x0d, 0x22, 0x24, 0x24, 0xc0, 0xc5, 0x5c,
0x5a, 0x94, 0x03, 0x34, 0x84, 0x51, 0x83, 0x33, 0x08, 0xc4, 0x14, 0x12, 0xe1, 0x62, 0x2d, 0xc9,
0x2c, 0xc9, 0x49, 0x95, 0x60, 0x02, 0x8b, 0x41, 0x38, 0x42, 0x52, 0x5c, 0x1c, 0xc5, 0x79, 0x99,
0x05, 0x05, 0xa9, 0x40, 0x1b, 0x99, 0x81, 0x36, 0x72, 0x06, 0xc1, 0xf9, 0x4a, 0xaa, 0x5c, 0xbc,
0x30, 0xfb, 0x0a, 0x4b, 0x81, 0x0e, 0x00, 0x19, 0x01, 0x64, 0x14, 0x55, 0x42, 0x8d, 0x85, 0x70,
0x8c, 0x96, 0x31, 0xc2, 0xd4, 0x05, 0xa7, 0x16, 0x95, 0x65, 0x26, 0xa7, 0x0a, 0x39, 0x73, 0xb1,
0x41, 0x04, 0x84, 0xa4, 0xb1, 0x3b, 0x1f, 0x6c, 0x9c, 0x94, 0x0c, 0x3e, 0xbf, 0x09, 0x05, 0x70,
0xf1, 0x07, 0x97, 0x14, 0xa5, 0x26, 0xe6, 0x02, 0xe5, 0x28, 0x36, 0x4d, 0x83, 0xd1, 0x80, 0x31,
0x89, 0x0d, 0x1c, 0x09, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x20, 0xd6, 0x09, 0xb8, 0x92,
0x01, 0x00, 0x00,
var fileDescriptor3 = []byte{
// 231 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x91, 0xbd, 0x4a, 0xc5, 0x40,
0x10, 0x85, 0x59, 0x83, 0xd1, 0x3b, 0xfe, 0x32, 0x58, 0x84, 0x68, 0x11, 0xae, 0x08, 0xa9, 0x16,
0xb9, 0xd6, 0x56, 0xb6, 0x16, 0xb2, 0x79, 0x82, 0x6b, 0x18, 0xe2, 0x42, 0x4c, 0x36, 0x33, 0x13,
0xc1, 0x87, 0xb1, 0xf5, 0x39, 0x25, 0x59, 0x23, 0x0a, 0x62, 0x63, 0xb7, 0xe7, 0xe3, 0xcc, 0xb7,
0xbb, 0x0c, 0x80, 0x92, 0xa8, 0x0d, 0xdc, 0x6b, 0x8f, 0x87, 0x0d, 0x87, 0xda, 0x4e, 0xc0, 0x77,
0xcd, 0xfa, 0xcd, 0xc0, 0x71, 0x45, 0x5b, 0xae, 0x9f, 0x1c, 0x49, 0xe8, 0x3b, 0x21, 0xbc, 0x85,
0x3d, 0x26, 0x19, 0x5b, 0x95, 0xcc, 0x14, 0x49, 0x79, 0xb0, 0xb9, 0xb4, 0xdf, 0x47, 0xec, 0xcf,
0xba, 0x75, 0x73, 0xd7, 0x2d, 0x33, 0xf9, 0x3d, 0xa4, 0x11, 0xe1, 0x29, 0x24, 0x23, 0xb7, 0x99,
0x29, 0x4c, 0xb9, 0x72, 0xd3, 0x11, 0xcf, 0x60, 0x57, 0xbd, 0xb6, 0x94, 0xed, 0xcc, 0x2c, 0x06,
0xcc, 0x61, 0x5f, 0x3a, 0x1f, 0x02, 0xa9, 0x64, 0x49, 0x91, 0x94, 0x2b, 0xf7, 0x95, 0xd7, 0x57,
0x70, 0xb4, 0xdc, 0x37, 0x8c, 0x24, 0x3a, 0x29, 0x86, 0x91, 0xf8, 0xf5, 0x53, 0x1b, 0xc3, 0xe6,
0xdd, 0x2c, 0xbd, 0x8a, 0xf8, 0xc5, 0xd7, 0x84, 0x77, 0x90, 0x46, 0x80, 0xe7, 0xbf, 0x3f, 0x7f,
0xd6, 0xe5, 0x17, 0x7f, 0xfd, 0x0d, 0x1f, 0xe0, 0xa4, 0x52, 0xa6, 0xed, 0xb3, 0xef, 0x9a, 0x7f,
0xdb, 0x4a, 0x73, 0x6d, 0x1e, 0xd3, 0x79, 0x09, 0x37, 0x1f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x20,
0xd6, 0x09, 0xb8, 0x92, 0x01, 0x00, 0x00,
}

View File

@ -156,9 +156,7 @@ func (s *serverReflectionServer) fileDescContainingExtension(st reflect.Type, ex
return nil, fmt.Errorf("failed to find registered extension for extension number %v", ext)
}
extT := reflect.TypeOf(extDesc.ExtensionType).Elem()
return s.fileDescForType(extT)
return s.decodeFileDesc(proto.FileDescriptor(extDesc.Filename))
}
func (s *serverReflectionServer) allExtensionNumbersForType(st reflect.Type) ([]int32, error) {

View File

@ -51,13 +51,15 @@ import (
var (
s = &serverReflectionServer{}
// fileDescriptor of each test proto file.
fdTest *dpb.FileDescriptorProto
fdProto2 *dpb.FileDescriptorProto
fdProto2Ext *dpb.FileDescriptorProto
fdTest *dpb.FileDescriptorProto
fdProto2 *dpb.FileDescriptorProto
fdProto2Ext *dpb.FileDescriptorProto
fdProto2Ext2 *dpb.FileDescriptorProto
// fileDescriptor marshalled.
fdTestByte []byte
fdProto2Byte []byte
fdProto2ExtByte []byte
fdTestByte []byte
fdProto2Byte []byte
fdProto2ExtByte []byte
fdProto2Ext2Byte []byte
)
func loadFileDesc(filename string) (*dpb.FileDescriptorProto, []byte) {
@ -80,6 +82,7 @@ func init() {
fdTest, fdTestByte = loadFileDesc("test.proto")
fdProto2, fdProto2Byte = loadFileDesc("proto2.proto")
fdProto2Ext, fdProto2ExtByte = loadFileDesc("proto2_ext.proto")
fdProto2Ext2, fdProto2Ext2Byte = loadFileDesc("proto2_ext2.proto")
}
func TestFileDescForType(t *testing.T) {
@ -88,7 +91,7 @@ func TestFileDescForType(t *testing.T) {
wantFd *dpb.FileDescriptorProto
}{
{reflect.TypeOf(pb.SearchResponse_Result{}), fdTest},
{reflect.TypeOf(pb.ToBeExtened{}), fdProto2},
{reflect.TypeOf(pb.ToBeExtended{}), fdProto2},
} {
fd, err := s.fileDescForType(test.st)
if err != nil || !reflect.DeepEqual(fd, test.wantFd) {
@ -128,7 +131,11 @@ func TestFileDescContainingExtension(t *testing.T) {
extNum int32
want *dpb.FileDescriptorProto
}{
{reflect.TypeOf(pb.ToBeExtened{}), 17, fdProto2Ext},
{reflect.TypeOf(pb.ToBeExtended{}), 13, fdProto2Ext},
{reflect.TypeOf(pb.ToBeExtended{}), 17, fdProto2Ext},
{reflect.TypeOf(pb.ToBeExtended{}), 19, fdProto2Ext},
{reflect.TypeOf(pb.ToBeExtended{}), 23, fdProto2Ext2},
{reflect.TypeOf(pb.ToBeExtended{}), 29, fdProto2Ext2},
} {
fd, err := s.fileDescContainingExtension(test.st, test.extNum)
if err != nil || !reflect.DeepEqual(fd, test.want) {
@ -149,7 +156,7 @@ func TestAllExtensionNumbersForType(t *testing.T) {
st reflect.Type
want []int32
}{
{reflect.TypeOf(pb.ToBeExtened{}), []int32{13, 17}},
{reflect.TypeOf(pb.ToBeExtended{}), []int32{13, 17, 19, 23, 29}},
} {
r, err := s.allExtensionNumbersForType(test.st)
sort.Sort(intArray(r))
@ -278,7 +285,7 @@ func testFileContainingSymbol(t *testing.T, stream rpb.ServerReflection_ServerRe
{"grpc.testing.SearchService.Search", fdTestByte},
{"grpc.testing.SearchService.StreamingSearch", fdTestByte},
{"grpc.testing.SearchResponse", fdTestByte},
{"grpc.testing.ToBeExtened", fdProto2Byte},
{"grpc.testing.ToBeExtended", fdProto2Byte},
} {
if err := stream.Send(&rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_FileContainingSymbol{
@ -309,7 +316,7 @@ func testFileContainingSymbolError(t *testing.T, stream rpb.ServerReflection_Ser
"grpc.testing.SerchService",
"grpc.testing.SearchService.SearchE",
"grpc.tesing.SearchResponse",
"gpc.testing.ToBeExtened",
"gpc.testing.ToBeExtended",
} {
if err := stream.Send(&rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_FileContainingSymbol{
@ -338,7 +345,11 @@ func testFileContainingExtension(t *testing.T, stream rpb.ServerReflection_Serve
extNum int32
want []byte
}{
{"grpc.testing.ToBeExtened", 17, fdProto2ExtByte},
{"grpc.testing.ToBeExtended", 13, fdProto2ExtByte},
{"grpc.testing.ToBeExtended", 17, fdProto2ExtByte},
{"grpc.testing.ToBeExtended", 19, fdProto2ExtByte},
{"grpc.testing.ToBeExtended", 23, fdProto2Ext2Byte},
{"grpc.testing.ToBeExtended", 29, fdProto2Ext2Byte},
} {
if err := stream.Send(&rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_FileContainingExtension{
@ -372,8 +383,8 @@ func testFileContainingExtensionError(t *testing.T, stream rpb.ServerReflection_
typeName string
extNum int32
}{
{"grpc.testing.ToBExtened", 17},
{"grpc.testing.ToBeExtened", 15},
{"grpc.testing.ToBExtended", 17},
{"grpc.testing.ToBeExtended", 15},
} {
if err := stream.Send(&rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_FileContainingExtension{
@ -404,7 +415,7 @@ func testAllExtensionNumbersOfType(t *testing.T, stream rpb.ServerReflection_Ser
typeName string
want []int32
}{
{"grpc.testing.ToBeExtened", []int32{13, 17}},
{"grpc.testing.ToBeExtended", []int32{13, 17, 19, 23, 29}},
} {
if err := stream.Send(&rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_AllExtensionNumbersOfType{
@ -435,7 +446,7 @@ func testAllExtensionNumbersOfType(t *testing.T, stream rpb.ServerReflection_Ser
func testAllExtensionNumbersOfTypeError(t *testing.T, stream rpb.ServerReflection_ServerReflectionInfoClient) {
for _, test := range []string{
"grpc.testing.ToBeExtenedE",
"grpc.testing.ToBeExtendedE",
} {
if err := stream.Send(&rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_AllExtensionNumbersOfType{

View File

@ -48,6 +48,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@ -140,6 +141,7 @@ type callInfo struct {
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
peer *peer.Peer
traceInfo traceInfo // in trace.go
}
@ -183,12 +185,20 @@ func Trailer(md *metadata.MD) CallOption {
})
}
// Peer returns a CallOption that retrieves peer information for a
// unary RPC.
func Peer(peer *peer.Peer) CallOption {
return afterCall(func(c *callInfo) {
*peer = *c.peer
})
}
// FailFast configures the action to take when an RPC is attempted on broken
// connections or unreachable servers. If failfast is true, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
// connection is available (or the call is canceled or times out) and will retry
// the call if it fails due to a transient error. Please refer to
// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md
// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md. Note: failFast is default to true.
func FailFast(failFast bool) CallOption {
return beforeCall(func(c *callInfo) error {
c.failFast = failFast
@ -367,7 +377,7 @@ type rpcError struct {
}
func (e *rpcError) Error() string {
return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
return fmt.Sprintf("rpc error: code = %s desc = %s", e.code, e.desc)
}
// Code returns the error code for err if it was produced by the rpc system.
@ -486,17 +496,17 @@ type MethodConfig struct {
// then the other will be used. If neither is set, then the RPC has no deadline.
Timeout time.Duration
// MaxReqSize is the maximum allowed payload size for an individual request in a
// stream (client->server) in bytes. The size which is measured is the serialized,
// uncompressed payload in bytes. The actual value used is the minumum of the value
// specified here and the value set by the application via the gRPC client API. If
// either one is not set, then the other will be used. If neither is set, then the
// built-in default is used.
// stream (client->server) in bytes. The size which is measured is the serialized
// payload after per-message compression (but before stream compression) in bytes.
// The actual value used is the minumum of the value specified here and the value set
// by the application via the gRPC client API. If either one is not set, then the other
// will be used. If neither is set, then the built-in default is used.
// TODO: support this.
MaxReqSize uint64
MaxReqSize uint32
// MaxRespSize is the maximum allowed payload size for an individual response in a
// stream (server->client) in bytes.
// TODO: support this.
MaxRespSize uint64
MaxRespSize uint32
}
// ServiceConfig is provided by the service provider and contains parameters for how

View File

@ -116,6 +116,7 @@ type options struct {
statsHandler stats.Handler
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
unknownStreamDesc *StreamDesc
}
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
@ -208,6 +209,24 @@ func StatsHandler(h stats.Handler) ServerOption {
}
}
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
// unknown service handler. The provided method is a bidi-streaming RPC service
// handler that will be invoked instead of returning the the "unimplemented" gRPC
// error whenever a request is received for an unregistered service or method.
// The handling function has full access to the Context of the request and the
// stream, and the invocation passes through interceptors.
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
return func(o *options) {
o.unknownStreamDesc = &StreamDesc{
StreamName: "unknown_service_handler",
Handler: streamHandler,
// We need to assume that the users of the streamHandler will want to use both.
ClientStreams: true,
ServerStreams: true,
}
}
}
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@ -815,15 +834,19 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}()
}
var appErr error
var server interface{}
if srv != nil {
server = srv.server
}
if s.opts.streamInt == nil {
appErr = sd.Handler(srv.server, ss)
appErr = sd.Handler(server, ss)
} else {
info := &StreamServerInfo{
FullMethod: stream.Method(),
IsClientStream: sd.ClientStreams,
IsServerStream: sd.ServerStreams,
}
appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
appErr = s.opts.streamInt(server, ss, info, sd.Handler)
}
if appErr != nil {
if err, ok := appErr.(*rpcError); ok {
@ -883,6 +906,10 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
method := sm[pos+1:]
srv, ok := s.m[service]
if !ok {
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
trInfo.tr.SetError()
@ -913,6 +940,10 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
trInfo.tr.SetError()
}
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
errDesc := fmt.Sprintf("unknown method %v", method)
if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
if trInfo != nil {

View File

@ -60,7 +60,7 @@ func TestStopBeforeServe(t *testing.T) {
// server.Serve is responsible for closing the listener, even if the
// server was already stopped.
err = lis.Close()
if got, want := ErrorDesc(err), "use of closed network connection"; !strings.Contains(got, want) {
if got, want := ErrorDesc(err), "use of closed"; !strings.Contains(got, want) {
t.Errorf("Close() error = %q, want %q", got, want)
}
}

View File

@ -428,6 +428,7 @@ type test struct {
streamClientInt grpc.StreamClientInterceptor
unaryServerInt grpc.UnaryServerInterceptor
streamServerInt grpc.StreamServerInterceptor
unknownHandler grpc.StreamHandler
sc <-chan grpc.ServiceConfig
// srv and srvAddr are set once startServer is called.
@ -493,10 +494,13 @@ func (te *test) startServer(ts testpb.TestServiceServer) {
if te.streamServerInt != nil {
sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt))
}
if te.unknownHandler != nil {
sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
}
la := "localhost:0"
switch te.e.network {
case "unix":
la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now())
la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
syscall.Unlink(la)
}
lis, err := net.Listen(te.e.network, la)
@ -1234,6 +1238,33 @@ func testHealthCheckOff(t *testing.T, e env) {
}
}
func TestUnknownHandler(t *testing.T) {
defer leakCheck(t)()
// An example unknownHandler that returns a different code and a different method, making sure that we do not
// expose what methods are implemented to a client that is not authenticated.
unknownHandler := func(srv interface{}, stream grpc.ServerStream) error {
return grpc.Errorf(codes.Unauthenticated, "user unauthenticated")
}
for _, e := range listTestEnv() {
// TODO(bradfitz): Temporarily skip this env due to #619.
if e.name == "handler-tls" {
continue
}
testUnknownHandler(t, e, unknownHandler)
}
}
func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) {
te := newTest(t, e)
te.unknownHandler = unknownHandler
te.startServer(&testServer{security: e.security})
defer te.tearDown()
want := grpc.Errorf(codes.Unauthenticated, "user unauthenticated")
if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !equalErrors(err, want) {
t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want)
}
}
func TestHealthCheckServingStatus(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
@ -1440,6 +1471,43 @@ func testExceedMsgLimit(t *testing.T, e env) {
}
}
func TestPeerClientSide(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testPeerClientSide(t, e)
}
}
func testPeerClientSide(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.startServer(&testServer{security: e.security})
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
peer := new(peer.Peer)
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.FailFast(false)); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
pa := peer.Addr.String()
if e.network == "unix" {
if pa != te.srvAddr {
t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
}
return
}
_, pp, err := net.SplitHostPort(pa)
if err != nil {
t.Fatalf("Failed to parse address from peer.")
}
_, sp, err := net.SplitHostPort(te.srvAddr)
if err != nil {
t.Fatalf("Failed to parse address of test server.")
}
if pp != sp {
t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
}
}
func TestMetadataUnaryRPC(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
@ -2634,6 +2702,48 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
}
}
const defaultMaxStreamsClient = 100
func TestExceedDefaultMaxStreamsLimit(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testExceedDefaultMaxStreamsLimit(t, e)
}
}
func testExceedDefaultMaxStreamsLimit(t *testing.T, e env) {
te := newTest(t, e)
te.declareLogNoise(
"http2Client.notifyError got notified that the client transport was broken",
"Conn.resetTransport failed to create client transport",
"grpc: the connection is closing",
)
// When masStream is set to 0 the server doesn't send a settings frame for
// MaxConcurrentStreams, essentially allowing infinite (math.MaxInt32) streams.
// In such a case, there should be a default cap on the client-side.
te.maxStream = 0
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
// Create as many streams as a client can.
for i := 0; i < defaultMaxStreamsClient; i++ {
if _, err := tc.StreamingInputCall(te.ctx); err != nil {
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
}
}
// Trying to create one more should timeout.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err := tc.StreamingInputCall(ctx)
if err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
}
}
func TestStreamsQuotaRecovery(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {

View File

@ -51,6 +51,7 @@ const (
infinity = time.Duration(math.MaxInt64)
defaultKeepaliveTime = infinity
defaultKeepaliveTimeout = time.Duration(20 * time.Second)
defaultMaxStreamsClient = 100
)
// The following defines various control items which could flow through

View File

@ -188,7 +188,7 @@ func TestHandlerTransport_NewServerHandlerTransport(t *testing.T) {
},
RequestURI: "/service/foo.bar",
},
wantErr: `stream error: code = 13 desc = "malformed time-out: transport: timeout unit is not recognized: \"tomorrow\""`,
wantErr: `stream error: code = Internal desc = "malformed time-out: transport: timeout unit is not recognized: \"tomorrow\""`,
},
{
name: "with metadata",

View File

@ -226,7 +226,8 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
state: reachable,
activeStreams: make(map[uint32]*Stream),
creds: opts.PerRPCCredentials,
maxStreams: math.MaxInt32,
maxStreams: defaultMaxStreamsClient,
streamsQuota: newQuotaPool(defaultMaxStreamsClient),
streamSendQuota: defaultWindowSize,
kp: kp,
statsHandler: opts.StatsHandler,
@ -362,21 +363,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Unlock()
return nil, ErrConnClosing
}
checkStreamsQuota := t.streamsQuota != nil
t.mu.Unlock()
if checkStreamsQuota {
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
// Returns the quota balance back.
if sq > 1 {
t.streamsQuota.add(sq - 1)
}
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
// Returns the quota balance back.
if sq > 1 {
t.streamsQuota.add(sq - 1)
}
if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
// Return the quota back now because there is no stream returned to the caller.
if _, ok := err.(StreamError); ok && checkStreamsQuota {
if _, ok := err.(StreamError); ok {
t.streamsQuota.add(1)
}
return nil, err
@ -384,9 +382,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Lock()
if t.state == draining {
t.mu.Unlock()
if checkStreamsQuota {
t.streamsQuota.add(1)
}
t.streamsQuota.add(1)
// Need to make t writable again so that the rpc in flight can still proceed.
t.writableChan <- 0
return nil, ErrStreamDrain
@ -408,16 +404,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
}
// This stream is not counted when applySetings(...) initialize t.streamsQuota.
// Reset t.streamsQuota to the right value.
var reset bool
if !checkStreamsQuota && t.streamsQuota != nil {
reset = true
}
t.mu.Unlock()
if reset {
t.streamsQuota.add(-1)
}
// HPACK encodes various headers. Note that once WriteField(...) is
// called, the corresponding headers/continuation frame has to be sent
@ -525,15 +512,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
// CloseStream clears the footprint of a stream when the stream is not needed any more.
// This must not be executed in reader's goroutine.
func (t *http2Client) CloseStream(s *Stream, err error) {
var updateStreams bool
t.mu.Lock()
if t.activeStreams == nil {
t.mu.Unlock()
return
}
if t.streamsQuota != nil {
updateStreams = true
}
delete(t.activeStreams, s.id)
if t.state == draining && len(t.activeStreams) == 0 {
// The transport is draining and s is the last live stream on t.
@ -542,10 +525,25 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
return
}
t.mu.Unlock()
if updateStreams {
t.streamsQuota.add(1)
}
// rstStream is true in case the stream is being closed at the client-side
// and the server needs to be intimated about it by sending a RST_STREAM
// frame.
// To make sure this frame is written to the wire before the headers of the
// next stream waiting for streamsQuota, we add to streamsQuota pool only
// after having acquired the writableChan to send RST_STREAM out (look at
// the controller() routine).
var rstStream bool
defer func() {
// In case, the client doesn't have to send RST_STREAM to server
// we can safely add back to streamsQuota pool now.
if !rstStream {
t.streamsQuota.add(1)
return
}
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
}()
s.mu.Lock()
rstStream = s.rstStream
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
t.controlBuf.put(&windowUpdate{0, n})
@ -562,7 +560,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
s.state = streamDone
s.mu.Unlock()
if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
rstStream = true
}
}
@ -803,10 +801,10 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
s.state = streamDone
s.statusCode = codes.Internal
s.statusDesc = err.Error()
s.rstStream = true
close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
s.mu.Unlock()
@ -1079,16 +1077,10 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
s.Val = math.MaxInt32
}
t.mu.Lock()
reset := t.streamsQuota != nil
if !reset {
t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
}
ms := t.maxStreams
t.maxStreams = int(s.Val)
t.mu.Unlock()
if reset {
t.streamsQuota.add(int(s.Val) - ms)
}
t.streamsQuota.add(int(s.Val) - ms)
case http2.SettingInitialWindowSize:
t.mu.Lock()
for _, stream := range t.activeStreams {
@ -1121,6 +1113,12 @@ func (t *http2Client) controller() {
t.framer.writeSettings(true, i.ss...)
}
case *resetStream:
// If the server needs to be to intimated about stream closing,
// then we need to make sure the RST_STREAM frame is written to
// the wire before the headers of the next stream waiting on
// streamQuota. We ensure this by adding to the streamsQuota pool
// only after having acquired the writableChan to send RST_STREAM.
t.streamsQuota.add(1)
t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO:
t.framer.flushWrite()

View File

@ -214,6 +214,9 @@ type Stream struct {
// the status received from the server.
statusCode codes.Code
statusDesc string
// rstStream indicates whether a RST_STREAM frame needs to be sent
// to the server to signify that this stream is closing.
rstStream bool
}
// RecvCompress returns the compression algorithm applied to the inbound
@ -375,6 +378,9 @@ func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (S
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
// Authority is the :authority pseudo-header to use. This field has no effect if
// TransportCredentials is set.
Authority string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
@ -568,7 +574,7 @@ type StreamError struct {
}
func (e StreamError) Error() string {
return fmt.Sprintf("stream error: code = %d desc = %q", e.Code, e.Desc)
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
}
// ContextErr converts the error from context package into a StreamError.

View File

@ -633,7 +633,10 @@ func TestMaxStreams(t *testing.T) {
case <-cc.streamsQuota.acquire():
t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.")
default:
if cc.streamsQuota.quota != 0 {
cc.streamsQuota.mu.Lock()
quota := cc.streamsQuota.quota
cc.streamsQuota.mu.Unlock()
if quota != 0 {
t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.")
}
}