mirror of https://github.com/grpc/grpc-go.git
interop/xds: Add Custom LB needed for interop test (#6262)
This commit is contained in:
parent
7d6134424a
commit
1536887cc6
|
@ -286,7 +286,7 @@ type PickResult struct {
|
|||
//
|
||||
// LB policies with child policies are responsible for propagating metadata
|
||||
// injected by their children to the ClientConn, as part of Pick().
|
||||
Metatada metadata.MD
|
||||
Metadata metadata.MD
|
||||
}
|
||||
|
||||
// TransientFailureError returns e. It exists for backward compatibility and
|
||||
|
|
|
@ -166,10 +166,10 @@ func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info bala
|
|||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
if res.Metatada == nil {
|
||||
res.Metatada = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
|
||||
if res.Metadata == nil {
|
||||
res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
|
||||
} else {
|
||||
res.Metatada.Append(rlsDataHeaderName, dcEntry.headerData)
|
||||
res.Metadata.Append(rlsDataHeaderName, dcEntry.headerData)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import (
|
|||
|
||||
testgrpc "google.golang.org/grpc/interop/grpc_testing"
|
||||
testpb "google.golang.org/grpc/interop/grpc_testing"
|
||||
_ "google.golang.org/grpc/interop/xds" // to register Custom LB.
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2023 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
// Package xds contains various xds interop helpers for usage in interop tests.
|
||||
package xds
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/internal/pretty"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
)
|
||||
|
||||
func init() {
|
||||
balancer.Register(rpcBehaviorBB{})
|
||||
}
|
||||
|
||||
const name = "test.RpcBehaviorLoadBalancer"
|
||||
|
||||
type rpcBehaviorBB struct{}
|
||||
|
||||
func (rpcBehaviorBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
|
||||
b := &rpcBehaviorLB{
|
||||
ClientConn: cc,
|
||||
}
|
||||
// round_robin child to complete balancer tree with a usable leaf policy and
|
||||
// have RPCs actually work.
|
||||
builder := balancer.Get(roundrobin.Name)
|
||||
if builder == nil {
|
||||
// Shouldn't happen, defensive programming. Registered from import of
|
||||
// roundrobin package.
|
||||
return nil
|
||||
}
|
||||
rr := builder.Build(b, bOpts)
|
||||
if rr == nil {
|
||||
// Shouldn't happen, defensive programming.
|
||||
return nil
|
||||
}
|
||||
b.Balancer = rr
|
||||
return b
|
||||
}
|
||||
|
||||
func (rpcBehaviorBB) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
|
||||
lbCfg := &lbConfig{}
|
||||
if err := json.Unmarshal(s, lbCfg); err != nil {
|
||||
return nil, fmt.Errorf("rpc-behavior-lb: unable to marshal lbConfig: %s, error: %v", string(s), err)
|
||||
}
|
||||
return lbCfg, nil
|
||||
|
||||
}
|
||||
|
||||
func (rpcBehaviorBB) Name() string {
|
||||
return name
|
||||
}
|
||||
|
||||
type lbConfig struct {
|
||||
serviceconfig.LoadBalancingConfig `json:"-"`
|
||||
RPCBehavior string `json:"rpcBehavior,omitempty"`
|
||||
}
|
||||
|
||||
// rpcBehaviorLB is a load balancer that wraps a round robin balancer and
|
||||
// appends the rpc-behavior metadata field to any metadata in pick results based
|
||||
// on what is specified in configuration.
|
||||
type rpcBehaviorLB struct {
|
||||
// embed a ClientConn to wrap only UpdateState() operation
|
||||
balancer.ClientConn
|
||||
// embed a Balancer to wrap only UpdateClientConnState() operation
|
||||
balancer.Balancer
|
||||
|
||||
mu sync.Mutex
|
||||
cfg *lbConfig
|
||||
}
|
||||
|
||||
func (b *rpcBehaviorLB) UpdateClientConnState(s balancer.ClientConnState) error {
|
||||
lbCfg, ok := s.BalancerConfig.(*lbConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("test.RpcBehaviorLoadBalancer:received config with unexpected type %T: %s", s.BalancerConfig, pretty.ToJSON(s.BalancerConfig))
|
||||
}
|
||||
b.mu.Lock()
|
||||
b.cfg = lbCfg
|
||||
b.mu.Unlock()
|
||||
return b.Balancer.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: s.ResolverState,
|
||||
})
|
||||
}
|
||||
|
||||
func (b *rpcBehaviorLB) UpdateState(state balancer.State) {
|
||||
b.mu.Lock()
|
||||
rpcBehavior := b.cfg.RPCBehavior
|
||||
b.mu.Unlock()
|
||||
|
||||
b.ClientConn.UpdateState(balancer.State{
|
||||
ConnectivityState: state.ConnectivityState,
|
||||
Picker: newRPCBehaviorPicker(state.Picker, rpcBehavior),
|
||||
})
|
||||
}
|
||||
|
||||
// rpcBehaviorPicker wraps a picker and adds the rpc-behavior metadata field
|
||||
// into the child pick result's metadata.
|
||||
type rpcBehaviorPicker struct {
|
||||
childPicker balancer.Picker
|
||||
rpcBehavior string
|
||||
}
|
||||
|
||||
// Pick appends the rpc-behavior metadata entry to the pick result of the child.
|
||||
func (p *rpcBehaviorPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
|
||||
pr, err := p.childPicker.Pick(info)
|
||||
if err != nil {
|
||||
return balancer.PickResult{}, err
|
||||
}
|
||||
pr.Metadata = metadata.Join(pr.Metadata, metadata.Pairs("rpc-behavior", p.rpcBehavior))
|
||||
return pr, nil
|
||||
}
|
||||
|
||||
func newRPCBehaviorPicker(childPicker balancer.Picker, rpcBehavior string) *rpcBehaviorPicker {
|
||||
return &rpcBehaviorPicker{
|
||||
childPicker: childPicker,
|
||||
rpcBehavior: rpcBehavior,
|
||||
}
|
||||
}
|
|
@ -0,0 +1,135 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2023 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package xds
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/grpctest"
|
||||
"google.golang.org/grpc/internal/stubserver"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
testgrpc "google.golang.org/grpc/interop/grpc_testing"
|
||||
testpb "google.golang.org/grpc/interop/grpc_testing"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/resolver/manual"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
)
|
||||
|
||||
var defaultTestTimeout = 5 * time.Second
|
||||
|
||||
type s struct {
|
||||
grpctest.Tester
|
||||
}
|
||||
|
||||
func Test(t *testing.T) {
|
||||
grpctest.RunSubTests(t, s{})
|
||||
}
|
||||
|
||||
// TestCustomLB tests the Custom LB for the interop client. It configures the
|
||||
// custom lb as the top level Load Balancing policy of the channel, then asserts
|
||||
// it can successfully make an RPC and also that the rpc behavior the Custom LB
|
||||
// is configured with makes it's way to the server in metadata.
|
||||
func (s) TestCustomLB(t *testing.T) {
|
||||
errCh := testutils.NewChannel()
|
||||
// Setup a backend which verifies the expected rpc-behavior metadata is
|
||||
// present in the request.
|
||||
backend := &stubserver.StubServer{
|
||||
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
errCh.Send(errors.New("failed to receive metadata"))
|
||||
return &testpb.SimpleResponse{}, nil
|
||||
}
|
||||
rpcBMD := md.Get("rpc-behavior")
|
||||
if len(rpcBMD) != 1 {
|
||||
errCh.Send(errors.New("only one value received for metadata key rpc-behavior"))
|
||||
return &testpb.SimpleResponse{}, nil
|
||||
}
|
||||
wantVal := "error-code-0"
|
||||
if rpcBMD[0] != wantVal {
|
||||
errCh.Send(fmt.Errorf("metadata val for key \"rpc-behavior\": got val %v, want val %v", rpcBMD[0], wantVal))
|
||||
return &testpb.SimpleResponse{}, nil
|
||||
}
|
||||
// Success.
|
||||
errCh.Send(nil)
|
||||
return &testpb.SimpleResponse{}, nil
|
||||
},
|
||||
}
|
||||
if err := backend.StartServer(); err != nil {
|
||||
t.Fatalf("Failed to start backend: %v", err)
|
||||
}
|
||||
t.Logf("Started good TestService backend at: %q", backend.Address)
|
||||
defer backend.Stop()
|
||||
|
||||
lbCfgJSON := `{
|
||||
"loadBalancingConfig": [
|
||||
{
|
||||
"test.RpcBehaviorLoadBalancer": {
|
||||
"rpcBehavior": "error-code-0"
|
||||
}
|
||||
}
|
||||
]
|
||||
}`
|
||||
|
||||
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON)
|
||||
mr := manual.NewBuilderWithScheme("customlb-e2e")
|
||||
defer mr.Close()
|
||||
mr.InitialState(resolver.State{
|
||||
Addresses: []resolver.Address{
|
||||
{Addr: backend.Address},
|
||||
},
|
||||
ServiceConfig: sc,
|
||||
})
|
||||
|
||||
cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
t.Fatalf("grpc.Dial() failed: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
testServiceClient := testgrpc.NewTestServiceClient(cc)
|
||||
|
||||
// Make a Unary RPC. This RPC should be successful due to the round_robin
|
||||
// leaf balancer. Also, the custom load balancer should inject the
|
||||
// "rpc-behavior" string it is configured with into the metadata sent to
|
||||
// server.
|
||||
if _, err := testServiceClient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
||||
t.Fatalf("EmptyCall() failed: %v", err)
|
||||
}
|
||||
|
||||
val, err := errCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error receiving from errCh: %v", err)
|
||||
}
|
||||
|
||||
// Should receive nil on the error channel which implies backend verified it
|
||||
// correctly received the correct "rpc-behavior" metadata.
|
||||
if err, ok := val.(error); ok {
|
||||
t.Fatalf("error in backend verifications on metadata received: %v", err)
|
||||
}
|
||||
}
|
|
@ -472,7 +472,7 @@ func (a *csAttempt) newStream() error {
|
|||
// It is safe to overwrite the csAttempt's context here, since all state
|
||||
// maintained in it are local to the attempt. When the attempt has to be
|
||||
// retried, a new instance of csAttempt will be created.
|
||||
if a.pickResult.Metatada != nil {
|
||||
if a.pickResult.Metadata != nil {
|
||||
// We currently do not have a function it the metadata package which
|
||||
// merges given metadata with existing metadata in a context. Existing
|
||||
// function `AppendToOutgoingContext()` takes a variadic argument of key
|
||||
|
@ -482,7 +482,7 @@ func (a *csAttempt) newStream() error {
|
|||
// in a form passable to AppendToOutgoingContext(), or create a version
|
||||
// of AppendToOutgoingContext() that accepts a metadata.MD.
|
||||
md, _ := metadata.FromOutgoingContext(a.ctx)
|
||||
md = metadata.Join(md, a.pickResult.Metatada)
|
||||
md = metadata.Join(md, a.pickResult.Metadata)
|
||||
a.ctx = metadata.NewOutgoingContext(a.ctx, md)
|
||||
}
|
||||
|
||||
|
|
|
@ -922,10 +922,10 @@ func (wp *wrappedPicker) Pick(info balancer.PickInfo) (balancer.PickResult, erro
|
|||
return balancer.PickResult{}, err
|
||||
}
|
||||
|
||||
if res.Metatada == nil {
|
||||
res.Metatada = metadata.Pairs(metadataHeaderInjectedByBalancer, metadataValueInjectedByBalancer)
|
||||
if res.Metadata == nil {
|
||||
res.Metadata = metadata.Pairs(metadataHeaderInjectedByBalancer, metadataValueInjectedByBalancer)
|
||||
} else {
|
||||
res.Metatada.Append(metadataHeaderInjectedByBalancer, metadataValueInjectedByBalancer)
|
||||
res.Metadata.Append(metadataHeaderInjectedByBalancer, metadataValueInjectedByBalancer)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue