diff --git a/clientconn.go b/clientconn.go index e81a48868..038ed884c 100644 --- a/clientconn.go +++ b/clientconn.go @@ -91,12 +91,16 @@ func WithCodec(c Codec) DialOption { } } +// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message +// compressor. func WithCompressor(f CompressorGenerator) DialOption { return func(o *dialOptions) { o.cg = f } } +// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating +// message decompressor. func WithDecompressor(f DecompressorGenerator) DialOption { return func(o *dialOptions) { o.dg = f diff --git a/rpc_util.go b/rpc_util.go index f48ad32c7..427b49e00 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -78,11 +78,15 @@ func (protoCodec) String() string { return "proto" } +// Compressor defines the interface gRPC uses to compress a message. type Compressor interface { + // Do compresses p into w. Do(w io.Writer, p []byte) error + // Type returns the compression algorithm the Compressor uses. Type() string } +// NewGZIPCompressor creates a Compressor based on GZIP. func NewGZIPCompressor() Compressor { return &gzipCompressor{} } @@ -102,14 +106,18 @@ func (c *gzipCompressor) Type() string { return "gzip" } +// Decompressor defines the interface gRPC uses to decompress a message. type Decompressor interface { + // Do reads the data from r and uncompress them. Do(r io.Reader) ([]byte, error) + // Type returns the compression algorithm the Decompressor uses. Type() string } type gzipDecompressor struct { } +// NewGZIPDecompressor creates a Decompressor based on GZIP. func NewGZIPDecompressor() Decompressor { return &gzipDecompressor{} } @@ -127,8 +135,10 @@ func (d *gzipDecompressor) Type() string { return "gzip" } +// CompressorGenerator defines the function generating a Compressor. type CompressorGenerator func() Compressor +// DecompressorGenerator defines the function generating a Decompressor. type DecompressorGenerator func() Decompressor // callInfo contains all related configuration and information about an RPC. diff --git a/test/end2end_test.go b/test/end2end_test.go index 7823ba5ee..22ca8778d 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -563,7 +563,7 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) { t.Fatalf("header[\"ua\"] = %q, %t, want %q, true", v, ok, testAppUA) } tearDown(s, cc) - ctx, _ = context.WithTimeout(context.Background(), 5 * time.Second) + ctx, _ = context.WithTimeout(context.Background(), 5*time.Second) if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil { t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, ", grpc.Ready, err) } @@ -823,12 +823,12 @@ func testCancelNoIO(t *testing.T, e env) { go func() { defer close(ch) // This should be blocked until the 1st is canceled. - ctx, _ := context.WithTimeout(context.Background(), 2 * time.Second) + ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) if _, err := tc.StreamingInputCall(ctx); err != nil { t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, ", tc, err) } }() - cancel(); + cancel() <-ch } diff --git a/transport/http2_client.go b/transport/http2_client.go index a23f6a723..7006cd879 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -208,13 +208,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { } // TODO(zhaoq): Handle uint32 overflow of Stream.id. s := &Stream{ - id: t.nextID, - method: callHdr.Method, - sendCompress: callHdr.SendCompress, - buf: newRecvBuffer(), - fc: fc, - sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), - headerChan: make(chan struct{}), + id: t.nextID, + method: callHdr.Method, + sendCompress: callHdr.SendCompress, + buf: newRecvBuffer(), + fc: fc, + sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), + headerChan: make(chan struct{}), } t.nextID += 2 s.windowHandler = func(n int) { diff --git a/transport/transport.go b/transport/transport.go index 9d6f4647a..8017bc154 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -170,13 +170,13 @@ type Stream struct { ctx context.Context cancel context.CancelFunc // method records the associated RPC method of the stream. - method string + method string recvCompress string - sendCompress string - buf *recvBuffer - dec io.Reader - fc *inFlow - recvQuota uint32 + sendCompress string + buf *recvBuffer + dec io.Reader + fc *inFlow + recvQuota uint32 // The accumulated inbound quota pending for window update. updateQuota uint32 // The handler to control the window update procedure for both this @@ -358,8 +358,8 @@ type Options struct { // CallHdr carries the information of a particular RPC. type CallHdr struct { - Host string // peer host - Method string // the operation to perform on the specified host + Host string // peer host + Method string // the operation to perform on the specified host RecvCompress string SendCompress string }