Pass address attributes from balancer to creds handshaker. (#3548)

This commit is contained in:
Easwar Swaminathan 2020-04-23 11:03:42 -07:00 committed by GitHub
parent 8f94cb18c0
commit 6a3c03883d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 338 additions and 12 deletions

View File

@ -111,6 +111,9 @@ type NewSubConnOptions struct {
// CredsBundle is the credentials bundle that will be used in the created
// SubConn. If it's nil, the original creds from grpc DialOptions will be
// used.
//
// Deprecated: Use the Attributes field in resolver.Address to pass
// arbitrary data to the credential handshaker.
CredsBundle credentials.Bundle
// HealthCheckEnabled indicates whether health check service should be
// enabled on this SubConn

View File

@ -29,6 +29,7 @@ import (
"net"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/internal"
)
@ -124,15 +125,18 @@ var ErrConnDispatched = errors.New("credentials: rawConn is dispatched out of gR
// TransportCredentials defines the common interface for all the live gRPC wire
// protocols and supported transport security protocols (e.g., TLS, SSL).
type TransportCredentials interface {
// ClientHandshake does the authentication handshake specified by the corresponding
// authentication protocol on rawConn for clients. It returns the authenticated
// connection and the corresponding auth information about the connection.
// The auth information should embed CommonAuthInfo to return additional information about
// the credentials. Implementations must use the provided context to implement timely cancellation.
// gRPC will try to reconnect if the error returned is a temporary error
// (io.EOF, context.DeadlineExceeded or err.Temporary() == true).
// If the returned error is a wrapper error, implementations should make sure that
// ClientHandshake does the authentication handshake specified by the
// corresponding authentication protocol on rawConn for clients. It returns
// the authenticated connection and the corresponding auth information
// about the connection. The auth information should embed CommonAuthInfo
// to return additional information about the credentials. Implementations
// must use the provided context to implement timely cancellation. gRPC
// will try to reconnect if the error returned is a temporary error
// (io.EOF, context.DeadlineExceeded or err.Temporary() == true). If the
// returned error is a wrapper error, implementations should make sure that
// the error implements Temporary() to have the correct retry behaviors.
// Additionally, ClientHandshakeInfo data will be available via the context
// passed to this call.
//
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
ClientHandshake(context.Context, string, net.Conn) (net.Conn, AuthInfo, error)
@ -193,6 +197,31 @@ func RequestInfoFromContext(ctx context.Context) (ri RequestInfo, ok bool) {
return
}
// ClientHandshakeInfo holds data to be passed to ClientHandshake. This makes
// it possible to pass arbitrary data to the handshaker from gRPC, resolver,
// balancer etc. Individual credential implementations control the actual
// format of the data that they are willing to receive.
//
// This API is experimental.
type ClientHandshakeInfo struct {
// Attributes contains the attributes for the address. It could be provided
// by the gRPC, resolver, balancer etc.
Attributes *attributes.Attributes
}
// clientHandshakeInfoKey is a struct used as the key to store
// ClientHandshakeInfo in a context.
type clientHandshakeInfoKey struct{}
// ClientHandshakeInfoFromContext returns the ClientHandshakeInfo struct stored
// in ctx.
//
// This API is experimental.
func ClientHandshakeInfoFromContext(ctx context.Context) ClientHandshakeInfo {
chi, _ := ctx.Value(clientHandshakeInfoKey{}).(ClientHandshakeInfo)
return chi
}
// CheckSecurityLevel checks if a connection's security level is greater than or equal to the specified one.
// It returns success if 1) the condition is satisified or 2) AuthInfo struct does not implement GetCommonAuthInfo() method
// or 3) CommonAuthInfo.SecurityLevel has an invalid zero value. For 2) and 3), it is for the purpose of backward-compatibility.
@ -223,6 +252,9 @@ func init() {
internal.NewRequestInfoContext = func(ctx context.Context, ri RequestInfo) context.Context {
return context.WithValue(ctx, requestInfoKey{}, ri)
}
internal.NewClientHandshakeInfoContext = func(ctx context.Context, chi ClientHandshakeInfo) context.Context {
return context.WithValue(ctx, clientHandshakeInfoKey{}, chi)
}
}
// ChannelzSecurityInfo defines the interface that security protocols should implement

View File

@ -0,0 +1,101 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package stub implements a balancer for testing purposes.
package stub
import "google.golang.org/grpc/balancer"
// BalancerFuncs contains all balancer.Balancer functions with a preceding
// *BalancerData parameter for passing additional instance information. Any
// nil functions will never be called.
type BalancerFuncs struct {
// Init is called after ClientConn and BuildOptions are set in
// BalancerData. It may be used to initialize BalancerData.Data.
Init func(*BalancerData)
UpdateClientConnState func(*BalancerData, balancer.ClientConnState) error
ResolverError func(*BalancerData, error)
UpdateSubConnState func(*BalancerData, balancer.SubConn, balancer.SubConnState)
Close func(*BalancerData)
}
// BalancerData contains data relevant to a stub balancer.
type BalancerData struct {
// ClientConn is set by the builder.
ClientConn balancer.ClientConn
// BuildOptions is set by the builder.
BuildOptions balancer.BuildOptions
// Data may be used to store arbitrary user data.
Data interface{}
}
type bal struct {
// TODO: Remove this once the legacy balancer API is removed. See
// https://github.com/grpc/grpc-go/pull/3431.
balancer.Balancer
bf BalancerFuncs
bd *BalancerData
}
func (b *bal) UpdateClientConnState(c balancer.ClientConnState) error {
if b.bf.UpdateClientConnState != nil {
return b.bf.UpdateClientConnState(b.bd, c)
}
return nil
}
func (b *bal) ResolverError(e error) {
if b.bf.ResolverError != nil {
b.bf.ResolverError(b.bd, e)
}
}
func (b *bal) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
if b.bf.UpdateSubConnState != nil {
b.bf.UpdateSubConnState(b.bd, sc, scs)
}
}
func (b *bal) Close() {
if b.bf.Close != nil {
b.bf.Close(b.bd)
}
}
type bb struct {
name string
bf BalancerFuncs
}
func (bb bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &bal{bf: bb.bf, bd: &BalancerData{ClientConn: cc, BuildOptions: opts}}
if b.bf.Init != nil {
b.bf.Init(b.bd)
}
return b
}
func (bb bb) Name() string { return bb.name }
// Register registers a stub balancer builder which will call the provided
// functions. The name used should be unique.
func Register(name string, bf BalancerFuncs) {
balancer.Register(bb{name: name, bf: bf})
}

View File

@ -40,6 +40,9 @@ var (
// NewRequestInfoContext creates a new context based on the argument context attaching
// the passed in RequestInfo to the new context.
NewRequestInfoContext interface{} // func(context.Context, credentials.RequestInfo) context.Context
// NewClientHandshakeInfoContext returns a copy of the input context with
// the passed in ClientHandshakeInfo struct added to it.
NewClientHandshakeInfoContext interface{} // func(context.Context, credentials.ClientHandshakeInfo) context.Context
// ParseServiceConfigForTesting is for creating a fake
// ClientConn for resolver testing only
ParseServiceConfigForTesting interface{} // func(string) *serviceconfig.ParseResult

View File

@ -215,6 +215,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}
if transportCreds != nil {
// gRPC, resolver, balancer etc. can specify arbitrary data in the
// Attributes field of resolver.Address, which is shoved into connectCtx
// and passed to the credential handshaker. This makes it possible for
// address specific arbitrary data to reach the credential handshaker.
contextWithHandshakeInfo := internal.NewClientHandshakeInfoContext.(func(context.Context, credentials.ClientHandshakeInfo) context.Context)
connectCtx = contextWithHandshakeInfo(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
scheme = "https"
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
if err != nil {

View File

@ -34,9 +34,12 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/internal/testutils"
@ -1816,3 +1819,54 @@ func (s) TestHeaderTblSize(t *testing.T) {
t.Fatalf("expected len(limits) = 2 within 10s, got != 2")
}
}
// attrTransportCreds is a transport credential implementation which stores
// Attributes from the ClientHandshakeInfo struct passed in the context locally
// for the test to inspect.
type attrTransportCreds struct {
credentials.TransportCredentials
attr *attributes.Attributes
}
func (ac *attrTransportCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
ai := credentials.ClientHandshakeInfoFromContext(ctx)
ac.attr = ai.Attributes
return rawConn, nil, nil
}
func (ac *attrTransportCreds) Info() credentials.ProtocolInfo {
return credentials.ProtocolInfo{}
}
func (ac *attrTransportCreds) Clone() credentials.TransportCredentials {
return nil
}
// TestClientHandshakeInfo adds attributes to the resolver.Address passes to
// NewClientTransport and verifies that these attributes are received by the
// transport credential handshaker.
func (s) TestClientHandshakeInfo(t *testing.T) {
server := setUpServerOnly(t, 0, &ServerConfig{}, pingpong)
defer server.stop()
const (
testAttrKey = "foo"
testAttrVal = "bar"
)
addr := resolver.Address{
Addr: "localhost:" + server.port,
Attributes: attributes.New(testAttrKey, testAttrVal),
}
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
defer cancel()
creds := &attrTransportCreds{}
tr, err := NewClientTransport(ctx, context.Background(), addr, ConnectOptions{TransportCredentials: creds}, func() {}, func(GoAwayReason) {}, func() {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}
defer tr.Close()
wantAttr := attributes.New(testAttrKey, testAttrVal)
if gotAttr := creds.attr; !cmp.Equal(gotAttr, wantAttr, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("received attributes %v in creds, want %v", gotAttr, wantAttr)
}
}

View File

@ -20,20 +20,27 @@ package test
import (
"context"
"fmt"
"net"
"reflect"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/testdata"
)
@ -43,7 +50,7 @@ const testBalancerName = "testbalancer"
// testBalancer creates one subconn with the first address from resolved
// addresses.
//
// It's used to test options for NewSubConn are applies correctly.
// It's used to test whether options for NewSubConn are applied correctly.
type testBalancer struct {
cc balancer.ClientConn
sc balancer.SubConn
@ -96,8 +103,7 @@ func (b *testBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectiv
}
}
func (b *testBalancer) Close() {
}
func (b *testBalancer) Close() {}
type picker struct {
err error
@ -346,3 +352,123 @@ func (s) TestNonGRPCLBBalancerGetsNoGRPCLBAddress(t *testing.T) {
t.Fatalf("With both backend and grpclb addresses, balancer got addresses %v, want %v", got, nonGRPCLBAddresses)
}
}
type aiPicker struct {
result balancer.PickResult
err error
}
func (aip *aiPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
return aip.result, aip.err
}
// attrTransportCreds is a transport credential implementation which stores
// Attributes from the ClientHandshakeInfo struct passed in the context locally
// for the test to inspect.
type attrTransportCreds struct {
credentials.TransportCredentials
attr *attributes.Attributes
}
func (ac *attrTransportCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
ai := credentials.ClientHandshakeInfoFromContext(ctx)
ac.attr = ai.Attributes
return rawConn, nil, nil
}
func (ac *attrTransportCreds) Info() credentials.ProtocolInfo {
return credentials.ProtocolInfo{}
}
func (ac *attrTransportCreds) Clone() credentials.TransportCredentials {
return nil
}
// TestAddressAttributesInNewSubConn verifies that the Attributes passed from a
// balancer in the resolver.Address that is passes to NewSubConn reaches all the
// way to the ClientHandshake method of the credentials configured on the parent
// channel.
func (s) TestAddressAttributesInNewSubConn(t *testing.T) {
const (
testAttrKey = "foo"
testAttrVal = "bar"
attrBalancerName = "attribute-balancer"
)
// Register a stub balancer which adds attributes to the first address that
// it receives and then calls NewSubConn on it.
bf := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
addrs := ccs.ResolverState.Addresses
if len(addrs) == 0 {
return nil
}
// Only use the first address.
attr := attributes.New(testAttrKey, testAttrVal)
addrs[0].Attributes = attr
sc, err := bd.ClientConn.NewSubConn([]resolver.Address{addrs[0]}, balancer.NewSubConnOptions{})
if err != nil {
return err
}
sc.Connect()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: state.ConnectivityState, Picker: &aiPicker{result: balancer.PickResult{SubConn: sc}, err: state.ConnectionError}})
},
}
stub.Register(attrBalancerName, bf)
t.Logf("Registered balancer %s...", attrBalancerName)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
t.Logf("Registered manual resolver with scheme %s...", r.Scheme())
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal(err)
}
s := grpc.NewServer()
testpb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)
defer s.Stop()
t.Logf("Started gRPC server at %s...", lis.Addr().String())
creds := &attrTransportCreds{}
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, attrBalancerName)),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatal(err)
}
defer cc.Close()
tc := testpb.NewTestServiceClient(cc)
t.Log("Created a ClientConn...")
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
t.Log("Made an RPC which was expected to fail...")
state := resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}
r.UpdateState(state)
t.Logf("Pushing resolver state update: %v through the manual resolver", state)
// The second RPC should succeed.
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
t.Log("Made an RPC which succeeded...")
wantAttr := attributes.New(testAttrKey, testAttrVal)
if gotAttr := creds.attr; !cmp.Equal(gotAttr, wantAttr, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("received attributes %v in creds, want %v", gotAttr, wantAttr)
}
}

3
vet.sh
View File

@ -126,7 +126,8 @@ staticcheck -go 1.9 -checks 'inherit,-ST1015' ./... > "${SC_OUT}" || true
# Error if anything other than deprecation warnings are printed.
not grep -v "is deprecated:.*SA1019" "${SC_OUT}"
# Only ignore the following deprecated types/fields/functions.
not grep -Fv '.HandleResolvedAddrs
not grep -Fv '.CredsBundle
.HandleResolvedAddrs
.HandleSubConnStateChange
.HeaderMap
.NewAddress