mirror of https://github.com/grpc/grpc-go.git
resolver: allow config selector to return an RPC error (#4082)
This commit is contained in:
parent
f64220f6e1
commit
750abe8f95
|
@ -109,11 +109,11 @@ type defaultConfigSelector struct {
|
||||||
sc *ServiceConfig
|
sc *ServiceConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) *iresolver.RPCConfig {
|
func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
|
||||||
return &iresolver.RPCConfig{
|
return &iresolver.RPCConfig{
|
||||||
Context: rpcInfo.Context,
|
Context: rpcInfo.Context,
|
||||||
MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
|
MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
|
||||||
}
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DialContext creates a client connection to the given target. By default, it's
|
// DialContext creates a client connection to the given target. By default, it's
|
||||||
|
|
|
@ -29,8 +29,10 @@ import (
|
||||||
|
|
||||||
// ConfigSelector controls what configuration to use for every RPC.
|
// ConfigSelector controls what configuration to use for every RPC.
|
||||||
type ConfigSelector interface {
|
type ConfigSelector interface {
|
||||||
// Selects the configuration for the RPC.
|
// Selects the configuration for the RPC, or terminates it using the error.
|
||||||
SelectConfig(RPCInfo) *RPCConfig
|
// This error will be converted by the gRPC library to a status error with
|
||||||
|
// code UNKNOWN if it is not returned as a status error.
|
||||||
|
SelectConfig(RPCInfo) (*RPCConfig, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RPCInfo contains RPC information needed by a ConfigSelector.
|
// RPCInfo contains RPC information needed by a ConfigSelector.
|
||||||
|
@ -86,7 +88,7 @@ func (scs *SafeConfigSelector) UpdateConfigSelector(cs ConfigSelector) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SelectConfig defers to the current ConfigSelector in scs.
|
// SelectConfig defers to the current ConfigSelector in scs.
|
||||||
func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) *RPCConfig {
|
func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) (*RPCConfig, error) {
|
||||||
scs.mu.RLock()
|
scs.mu.RLock()
|
||||||
defer scs.mu.RUnlock()
|
defer scs.mu.RUnlock()
|
||||||
return scs.cs.SelectConfig(r)
|
return scs.cs.SelectConfig(r)
|
||||||
|
|
|
@ -36,10 +36,10 @@ func Test(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeConfigSelector struct {
|
type fakeConfigSelector struct {
|
||||||
selectConfig func(RPCInfo) *RPCConfig
|
selectConfig func(RPCInfo) (*RPCConfig, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeConfigSelector) SelectConfig(r RPCInfo) *RPCConfig {
|
func (f *fakeConfigSelector) SelectConfig(r RPCInfo) (*RPCConfig, error) {
|
||||||
return f.selectConfig(r)
|
return f.selectConfig(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,21 +59,21 @@ func (s) TestSafeConfigSelector(t *testing.T) {
|
||||||
cs2Called := make(chan struct{})
|
cs2Called := make(chan struct{})
|
||||||
|
|
||||||
cs1 := &fakeConfigSelector{
|
cs1 := &fakeConfigSelector{
|
||||||
selectConfig: func(r RPCInfo) *RPCConfig {
|
selectConfig: func(r RPCInfo) (*RPCConfig, error) {
|
||||||
cs1Called <- struct{}{}
|
cs1Called <- struct{}{}
|
||||||
if diff := cmp.Diff(r, testRPCInfo); diff != "" {
|
if diff := cmp.Diff(r, testRPCInfo); diff != "" {
|
||||||
t.Errorf("SelectConfig(%v) called; want %v\n Diffs:\n%s", r, testRPCInfo, diff)
|
t.Errorf("SelectConfig(%v) called; want %v\n Diffs:\n%s", r, testRPCInfo, diff)
|
||||||
}
|
}
|
||||||
return <-retChan1
|
return <-retChan1, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
cs2 := &fakeConfigSelector{
|
cs2 := &fakeConfigSelector{
|
||||||
selectConfig: func(r RPCInfo) *RPCConfig {
|
selectConfig: func(r RPCInfo) (*RPCConfig, error) {
|
||||||
cs2Called <- struct{}{}
|
cs2Called <- struct{}{}
|
||||||
if diff := cmp.Diff(r, testRPCInfo); diff != "" {
|
if diff := cmp.Diff(r, testRPCInfo); diff != "" {
|
||||||
t.Errorf("SelectConfig(%v) called; want %v\n Diffs:\n%s", r, testRPCInfo, diff)
|
t.Errorf("SelectConfig(%v) called; want %v\n Diffs:\n%s", r, testRPCInfo, diff)
|
||||||
}
|
}
|
||||||
return <-retChan2
|
return <-retChan2, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,9 +82,9 @@ func (s) TestSafeConfigSelector(t *testing.T) {
|
||||||
|
|
||||||
cs1Returned := make(chan struct{})
|
cs1Returned := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
got := scs.SelectConfig(testRPCInfo) // blocks until send to retChan1
|
got, err := scs.SelectConfig(testRPCInfo) // blocks until send to retChan1
|
||||||
if got != resp1 {
|
if err != nil || got != resp1 {
|
||||||
t.Errorf("SelectConfig(%v) = %v; want %v", testRPCInfo, got, resp1)
|
t.Errorf("SelectConfig(%v) = %v, %v; want %v, nil", testRPCInfo, got, err, resp1)
|
||||||
}
|
}
|
||||||
close(cs1Returned)
|
close(cs1Returned)
|
||||||
}()
|
}()
|
||||||
|
@ -112,7 +112,8 @@ func (s) TestSafeConfigSelector(t *testing.T) {
|
||||||
for dl := time.Now().Add(150 * time.Millisecond); !time.Now().After(dl); {
|
for dl := time.Now().Add(150 * time.Millisecond); !time.Now().After(dl); {
|
||||||
gotConfigChan := make(chan *RPCConfig)
|
gotConfigChan := make(chan *RPCConfig)
|
||||||
go func() {
|
go func() {
|
||||||
gotConfigChan <- scs.SelectConfig(testRPCInfo)
|
cfg, _ := scs.SelectConfig(testRPCInfo)
|
||||||
|
gotConfigChan <- cfg
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-time.After(500 * time.Millisecond):
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
|
|
@ -175,7 +175,10 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||||
|
|
||||||
var mc serviceconfig.MethodConfig
|
var mc serviceconfig.MethodConfig
|
||||||
var onCommit func()
|
var onCommit func()
|
||||||
rpcConfig := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method})
|
rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method})
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Convert(err).Err()
|
||||||
|
}
|
||||||
if rpcConfig != nil {
|
if rpcConfig != nil {
|
||||||
if rpcConfig.Context != nil {
|
if rpcConfig.Context != nil {
|
||||||
ctx = rpcConfig.Context
|
ctx = rpcConfig.Context
|
||||||
|
|
|
@ -20,11 +20,13 @@ package test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/google/go-cmp/cmp/cmpopts"
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
iresolver "google.golang.org/grpc/internal/resolver"
|
iresolver "google.golang.org/grpc/internal/resolver"
|
||||||
"google.golang.org/grpc/internal/serviceconfig"
|
"google.golang.org/grpc/internal/serviceconfig"
|
||||||
"google.golang.org/grpc/internal/stubserver"
|
"google.golang.org/grpc/internal/stubserver"
|
||||||
|
@ -32,14 +34,15 @@ import (
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/resolver"
|
"google.golang.org/grpc/resolver"
|
||||||
"google.golang.org/grpc/resolver/manual"
|
"google.golang.org/grpc/resolver/manual"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
type funcConfigSelector struct {
|
type funcConfigSelector struct {
|
||||||
f func(iresolver.RPCInfo) *iresolver.RPCConfig
|
f func(iresolver.RPCInfo) (*iresolver.RPCConfig, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f funcConfigSelector) SelectConfig(i iresolver.RPCInfo) *iresolver.RPCConfig {
|
func (f funcConfigSelector) SelectConfig(i iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
|
||||||
return f.f(i)
|
return f.f(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,12 +78,14 @@ func (s) TestConfigSelector(t *testing.T) {
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
name string
|
name string
|
||||||
md metadata.MD
|
md metadata.MD // MD sent with RPC
|
||||||
config *iresolver.RPCConfig
|
config *iresolver.RPCConfig // config returned by config selector
|
||||||
|
csErr error // error returned by config selector
|
||||||
|
|
||||||
wantMD metadata.MD
|
wantMD metadata.MD
|
||||||
wantDeadline time.Time
|
wantDeadline time.Time
|
||||||
wantTimeout time.Duration
|
wantTimeout time.Duration
|
||||||
|
wantErr error
|
||||||
}{{
|
}{{
|
||||||
name: "basic",
|
name: "basic",
|
||||||
md: testMD,
|
md: testMD,
|
||||||
|
@ -95,6 +100,10 @@ func (s) TestConfigSelector(t *testing.T) {
|
||||||
},
|
},
|
||||||
wantMD: mdOut,
|
wantMD: mdOut,
|
||||||
wantDeadline: ctxDeadline,
|
wantDeadline: ctxDeadline,
|
||||||
|
}, {
|
||||||
|
name: "erroring SelectConfig",
|
||||||
|
csErr: status.Errorf(codes.Unavailable, "cannot send RPC"),
|
||||||
|
wantErr: status.Errorf(codes.Unavailable, "cannot send RPC"),
|
||||||
}, {
|
}, {
|
||||||
name: "alter timeout; remove MD",
|
name: "alter timeout; remove MD",
|
||||||
md: testMD,
|
md: testMD,
|
||||||
|
@ -138,13 +147,13 @@ func (s) TestConfigSelector(t *testing.T) {
|
||||||
Addresses: []resolver.Address{{Addr: ss.Address}},
|
Addresses: []resolver.Address{{Addr: ss.Address}},
|
||||||
ServiceConfig: parseCfg(ss.R, "{}"),
|
ServiceConfig: parseCfg(ss.R, "{}"),
|
||||||
}, funcConfigSelector{
|
}, funcConfigSelector{
|
||||||
f: func(i iresolver.RPCInfo) *iresolver.RPCConfig {
|
f: func(i iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
|
||||||
gotInfo = &i
|
gotInfo = &i
|
||||||
cfg := tc.config
|
cfg := tc.config
|
||||||
if cfg != nil && cfg.Context == nil {
|
if cfg != nil && cfg.Context == nil {
|
||||||
cfg.Context = i.Context
|
cfg.Context = i.Context
|
||||||
}
|
}
|
||||||
return cfg
|
return cfg, tc.csErr
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
ss.R.UpdateState(state) // Blocks until config selector is applied
|
ss.R.UpdateState(state) // Blocks until config selector is applied
|
||||||
|
@ -152,8 +161,10 @@ func (s) TestConfigSelector(t *testing.T) {
|
||||||
onCommittedCalled = false
|
onCommittedCalled = false
|
||||||
ctx := metadata.NewOutgoingContext(ctx, tc.md)
|
ctx := metadata.NewOutgoingContext(ctx, tc.md)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); fmt.Sprint(err) != fmt.Sprint(tc.wantErr) {
|
||||||
t.Fatalf("client.EmptyCall(_, _) = _, %v; want _, nil", err)
|
t.Fatalf("client.EmptyCall(_, _) = _, %v; want _, %v", err, tc.wantErr)
|
||||||
|
} else if err != nil {
|
||||||
|
return // remaining checks are invalid
|
||||||
}
|
}
|
||||||
|
|
||||||
if gotInfo == nil {
|
if gotInfo == nil {
|
||||||
|
|
Loading…
Reference in New Issue