client: set grpc-accept-encoding to full list of registered compressors (#5541)

This commit is contained in:
Ronak Jain 2022-10-12 05:07:02 +05:30 committed by GitHub
parent c672451950
commit 7b817b4d18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 195 additions and 3 deletions

View File

@ -28,6 +28,8 @@ package encoding
import (
"io"
"strings"
"google.golang.org/grpc/internal/grpcutil"
)
// Identity specifies the optional encoding for uncompressed streams.
@ -73,6 +75,7 @@ var registeredCompressor = make(map[string]Compressor)
// registered with the same name, the one registered last will take effect.
func RegisterCompressor(c Compressor) {
registeredCompressor[c.Name()] = c
grpcutil.RegisteredCompressorNames = append(grpcutil.RegisteredCompressorNames, c.Name())
}
// GetCompressor returns Compressor for the given compressor name.

View File

@ -25,11 +25,15 @@ import (
)
const (
prefix = "GRPC_GO_"
txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
prefix = "GRPC_GO_"
txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
advertiseCompressorsStr = prefix + "ADVERTISE_COMPRESSORS"
)
var (
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
// AdvertiseCompressors is set if registered compressor should be advertised
// ("GRPC_GO_ADVERTISE_COMPRESSORS" is not "false").
AdvertiseCompressors = !strings.EqualFold(os.Getenv(advertiseCompressorsStr), "false")
)

View File

@ -0,0 +1,47 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcutil
import (
"strings"
"google.golang.org/grpc/internal/envconfig"
)
// RegisteredCompressorNames holds names of the registered compressors.
var RegisteredCompressorNames []string
// IsCompressorNameRegistered returns true when name is available in registry.
func IsCompressorNameRegistered(name string) bool {
for _, compressor := range RegisteredCompressorNames {
if compressor == name {
return true
}
}
return false
}
// RegisteredCompressors returns a string of registered compressor names
// separated by comma.
func RegisteredCompressors() string {
if !envconfig.AdvertiseCompressors {
return ""
}
return strings.Join(RegisteredCompressorNames, ",")
}

View File

@ -0,0 +1,46 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcutil
import (
"testing"
"google.golang.org/grpc/internal/envconfig"
)
func TestRegisteredCompressors(t *testing.T) {
defer func(c []string) { RegisteredCompressorNames = c }(RegisteredCompressorNames)
defer func(v bool) { envconfig.AdvertiseCompressors = v }(envconfig.AdvertiseCompressors)
RegisteredCompressorNames = []string{"gzip", "snappy"}
tests := []struct {
desc string
enabled bool
want string
}{
{desc: "compressor_ad_disabled", enabled: false, want: ""},
{desc: "compressor_ad_enabled", enabled: true, want: "gzip,snappy"},
}
for _, tt := range tests {
envconfig.AdvertiseCompressors = tt.enabled
compressors := RegisteredCompressors()
if compressors != tt.want {
t.Fatalf("Unexpected compressors got:%s, want:%s", compressors, tt.want)
}
}
}

View File

@ -110,6 +110,7 @@ type http2Client struct {
streamsQuotaAvailable chan struct{}
waitingStreams uint32
nextID uint32
registeredCompressors string
// Do not access controlBuf with mu held.
mu sync.Mutex // guard the following variables
@ -300,6 +301,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
ctxDone: ctx.Done(), // Cache Done chan.
cancel: cancel,
userAgent: opts.UserAgent,
registeredCompressors: grpcutil.RegisteredCompressors(),
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@ -508,9 +510,22 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
}
registeredCompressors := t.registeredCompressors
if callHdr.SendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
// Include the outgoing compressor name when compressor is not registered
// via encoding.RegisterCompressor. This is possible when client uses
// WithCompressor dial option.
if !grpcutil.IsCompressorNameRegistered(callHdr.SendCompress) {
if registeredCompressors != "" {
registeredCompressors += ","
}
registeredCompressors += callHdr.SendCompress
}
}
if registeredCompressors != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: registeredCompressors})
}
if dl, ok := ctx.Deadline(); ok {
// Send out timeout regardless its value. The server can detect timeout context by itself.

View File

@ -3250,6 +3250,7 @@ func testMetadataUnaryRPC(t *testing.T, e env) {
delete(header, "date") // the Date header is also optional
delete(header, "user-agent")
delete(header, "content-type")
delete(header, "grpc-accept-encoding")
}
if !reflect.DeepEqual(header, testMetadata) {
t.Fatalf("Received header metadata %v, want %v", header, testMetadata)
@ -3289,6 +3290,7 @@ func testMetadataOrderUnaryRPC(t *testing.T, e env) {
delete(header, "date") // the Date header is also optional
delete(header, "user-agent")
delete(header, "content-type")
delete(header, "grpc-accept-encoding")
}
if !reflect.DeepEqual(header, newMetadata) {
@ -3401,6 +3403,8 @@ func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) {
}
delete(header, "user-agent")
delete(header, "content-type")
delete(header, "grpc-accept-encoding")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -3445,6 +3449,7 @@ func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) {
}
delete(header, "user-agent")
delete(header, "content-type")
delete(header, "grpc-accept-encoding")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -3488,6 +3493,7 @@ func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) {
}
delete(header, "user-agent")
delete(header, "content-type")
delete(header, "grpc-accept-encoding")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -3528,6 +3534,7 @@ func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) {
}
delete(header, "user-agent")
delete(header, "content-type")
delete(header, "grpc-accept-encoding")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -3591,6 +3598,7 @@ func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) {
}
delete(header, "user-agent")
delete(header, "content-type")
delete(header, "grpc-accept-encoding")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -3651,6 +3659,7 @@ func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
}
delete(header, "user-agent")
delete(header, "content-type")
delete(header, "grpc-accept-encoding")
expectedHeader := metadata.Join(testMetadata, testMetadata2)
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
@ -3982,6 +3991,7 @@ func testMetadataStreamingRPC(t *testing.T, e env) {
delete(headerMD, "trailer") // ignore if present
delete(headerMD, "user-agent")
delete(headerMD, "content-type")
delete(headerMD, "grpc-accept-encoding")
if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
}
@ -3990,6 +4000,7 @@ func testMetadataStreamingRPC(t *testing.T, e env) {
delete(headerMD, "trailer") // ignore if present
delete(headerMD, "user-agent")
delete(headerMD, "content-type")
delete(headerMD, "grpc-accept-encoding")
if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
}
@ -5432,6 +5443,72 @@ func (s) TestForceServerCodec(t *testing.T) {
}
}
// renameCompressor is a grpc.Compressor wrapper that allows customizing the
// Type() of another compressor.
type renameCompressor struct {
grpc.Compressor
name string
}
func (r *renameCompressor) Type() string { return r.name }
// renameDecompressor is a grpc.Decompressor wrapper that allows customizing the
// Type() of another Decompressor.
type renameDecompressor struct {
grpc.Decompressor
name string
}
func (r *renameDecompressor) Type() string { return r.name }
func (s) TestClientForwardsGrpcAcceptEncodingHeader(t *testing.T) {
wantGrpcAcceptEncodingCh := make(chan []string, 1)
defer close(wantGrpcAcceptEncodingCh)
compressor := renameCompressor{Compressor: grpc.NewGZIPCompressor(), name: "testgzip"}
decompressor := renameDecompressor{Decompressor: grpc.NewGZIPDecompressor(), name: "testgzip"}
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Errorf(codes.Internal, "no metadata in context")
}
if got, want := md["grpc-accept-encoding"], <-wantGrpcAcceptEncodingCh; !reflect.DeepEqual(got, want) {
return nil, status.Errorf(codes.Internal, "got grpc-accept-encoding=%q; want [%q]", got, want)
}
return &testpb.Empty{}, nil
},
}
if err := ss.Start([]grpc.ServerOption{grpc.RPCDecompressor(&decompressor)}); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantGrpcAcceptEncodingCh <- []string{"gzip"}
if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err)
}
wantGrpcAcceptEncodingCh <- []string{"gzip"}
if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}, grpc.UseCompressor("gzip")); err != nil {
t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err)
}
// Use compressor directly which is not registered via
// encoding.RegisterCompressor.
if err := ss.StartClient(grpc.WithCompressor(&compressor)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
wantGrpcAcceptEncodingCh <- []string{"gzip,testgzip"}
if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err)
}
}
func (s) TestUnaryProxyDoesNotForwardMetadata(t *testing.T) {
const mdkey = "somedata"