mirror of https://github.com/grpc/grpc-go.git
xds: Add RLS Cluster Specifier Plugin (#5004)
* xds: Add RLS Cluster Specifier Plugin
This commit is contained in:
parent
50f82701b5
commit
029b822735
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
// Package rls imports to init the rls lb policy for testing purposes.
|
||||
package rls
|
||||
|
||||
import (
|
||||
// Blank import to init the rls lb policy for external use.
|
||||
_ "google.golang.org/grpc/balancer/rls/internal"
|
||||
)
|
||||
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
// Package rls implements the RLS cluster specifier plugin.
|
||||
package rls
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/internal/envconfig"
|
||||
"google.golang.org/grpc/internal/proto/grpc_lookup_v1"
|
||||
"google.golang.org/grpc/xds/internal/clusterspecifier"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
|
||||
// Blank import to init the RLS LB policy.
|
||||
_ "google.golang.org/grpc/balancer/rls"
|
||||
)
|
||||
|
||||
const rlsBalancerName = "rls_experimental"
|
||||
|
||||
func init() {
|
||||
if envconfig.XDSRLS {
|
||||
clusterspecifier.Register(rls{})
|
||||
}
|
||||
}
|
||||
|
||||
type rls struct{}
|
||||
|
||||
func (rls) TypeURLs() []string {
|
||||
return []string{"type.googleapis.com/grpc.lookup.v1.RouteLookupClusterSpecifier"}
|
||||
}
|
||||
|
||||
// lbConfigJSON is the RLS LB Policies configuration in JSON format.
|
||||
// RouteLookupConfig will be a raw JSON string from the passed in proto
|
||||
// configuration, and the other fields will be hardcoded.
|
||||
type lbConfigJSON struct {
|
||||
RouteLookupConfig json.RawMessage `json:"routeLookupConfig"`
|
||||
ChildPolicy []map[string]json.RawMessage `json:"childPolicy"`
|
||||
ChildPolicyConfigTargetFieldName string `json:"childPolicyConfigTargetFieldName"`
|
||||
}
|
||||
|
||||
func (rls) ParseClusterSpecifierConfig(cfg proto.Message) (clusterspecifier.BalancerConfig, error) {
|
||||
if cfg == nil {
|
||||
return nil, fmt.Errorf("rls_csp: nil configuration message provided")
|
||||
}
|
||||
any, ok := cfg.(*anypb.Any)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("rls_csp: error parsing config %v: unknown type %T", cfg, cfg)
|
||||
}
|
||||
rlcs := new(grpc_lookup_v1.RouteLookupClusterSpecifier)
|
||||
|
||||
if err := ptypes.UnmarshalAny(any, rlcs); err != nil {
|
||||
return nil, fmt.Errorf("rls_csp: error parsing config %v: %v", cfg, err)
|
||||
}
|
||||
rlcJSON, err := protojson.Marshal(rlcs.GetRouteLookupConfig())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rls_csp: error marshaling route lookup config: %v: %v", rlcs.GetRouteLookupConfig(), err)
|
||||
}
|
||||
lbCfgJSON := &lbConfigJSON{
|
||||
RouteLookupConfig: rlcJSON, // "JSON form of RouteLookupClusterSpecifier.config" - RLS in xDS Design Doc
|
||||
ChildPolicy: []map[string]json.RawMessage{
|
||||
{
|
||||
"cds_experimental": json.RawMessage("{}"),
|
||||
},
|
||||
},
|
||||
ChildPolicyConfigTargetFieldName: "cluster",
|
||||
}
|
||||
|
||||
rawJSON, err := json.Marshal(lbCfgJSON)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rls_csp: error marshaling load balancing config %v: %v", lbCfgJSON, err)
|
||||
}
|
||||
|
||||
rlsBB := balancer.Get(rlsBalancerName)
|
||||
if rlsBB == nil {
|
||||
return nil, fmt.Errorf("RLS LB policy not registered")
|
||||
}
|
||||
_, err = rlsBB.(balancer.ConfigParser).ParseConfig(rawJSON)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rls_csp: validation error from rls lb policy parsing %v", err)
|
||||
}
|
||||
|
||||
return clusterspecifier.BalancerConfig{{rlsBalancerName: lbCfgJSON}}, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,168 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package rls
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
_ "google.golang.org/grpc/balancer/rls"
|
||||
"google.golang.org/grpc/internal/grpctest"
|
||||
"google.golang.org/grpc/internal/proto/grpc_lookup_v1"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer"
|
||||
"google.golang.org/grpc/xds/internal/clusterspecifier"
|
||||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
)
|
||||
|
||||
func init() {
|
||||
clusterspecifier.Register(rls{})
|
||||
}
|
||||
|
||||
type s struct {
|
||||
grpctest.Tester
|
||||
}
|
||||
|
||||
func Test(t *testing.T) {
|
||||
grpctest.RunSubTests(t, s{})
|
||||
}
|
||||
|
||||
// TestParseClusterSpecifierConfig tests the parsing functionality of the RLS
|
||||
// Cluster Specifier Plugin.
|
||||
func (s) TestParseClusterSpecifierConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
rlcs proto.Message
|
||||
wantConfig clusterspecifier.BalancerConfig
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "invalid-rls-cluster-specifier",
|
||||
rlcs: rlsClusterSpecifierConfigError,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "valid-rls-cluster-specifier",
|
||||
rlcs: rlsClusterSpecifierConfigWithoutTransformations,
|
||||
wantConfig: configWithoutTransformationsWant,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
cs := clusterspecifier.Get("type.googleapis.com/grpc.lookup.v1.RouteLookupClusterSpecifier")
|
||||
if cs == nil {
|
||||
t.Fatal("Error getting cluster specifier")
|
||||
}
|
||||
lbCfg, err := cs.ParseClusterSpecifierConfig(test.rlcs)
|
||||
|
||||
if (err != nil) != test.wantErr {
|
||||
t.Fatalf("ParseClusterSpecifierConfig(%+v) returned err: %v, wantErr: %v", test.rlcs, err, test.wantErr)
|
||||
}
|
||||
if test.wantErr { // Successfully received an error.
|
||||
return
|
||||
}
|
||||
// Marshal and then unmarshal into interface{} to get rid of
|
||||
// nondeterministic protojson Marshaling.
|
||||
lbCfgJSON, err := json.Marshal(lbCfg)
|
||||
if err != nil {
|
||||
t.Fatalf("json.Marshal(%+v) returned err %v", lbCfg, err)
|
||||
}
|
||||
var got interface{}
|
||||
err = json.Unmarshal(lbCfgJSON, got)
|
||||
if err != nil {
|
||||
t.Fatalf("json.Unmarshal(%+v) returned err %v", lbCfgJSON, err)
|
||||
}
|
||||
wantCfgJSON, err := json.Marshal(test.wantConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("json.Marshal(%+v) returned err %v", test.wantConfig, err)
|
||||
}
|
||||
var want interface{}
|
||||
err = json.Unmarshal(wantCfgJSON, want)
|
||||
if err != nil {
|
||||
t.Fatalf("json.Unmarshal(%+v) returned err %v", lbCfgJSON, err)
|
||||
}
|
||||
if diff := cmp.Diff(want, got, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Fatalf("ParseClusterSpecifierConfig(%+v) returned expected, diff (-want +got) %v", test.rlcs, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This will error because the required match field is set in grpc key builder.
|
||||
var rlsClusterSpecifierConfigError = testutils.MarshalAny(&grpc_lookup_v1.RouteLookupClusterSpecifier{
|
||||
RouteLookupConfig: &grpc_lookup_v1.RouteLookupConfig{
|
||||
GrpcKeybuilders: []*grpc_lookup_v1.GrpcKeyBuilder{
|
||||
{
|
||||
Names: []*grpc_lookup_v1.GrpcKeyBuilder_Name{
|
||||
{
|
||||
Service: "service",
|
||||
Method: "method",
|
||||
},
|
||||
},
|
||||
Headers: []*grpc_lookup_v1.NameMatcher{
|
||||
{
|
||||
Key: "k1",
|
||||
RequiredMatch: true,
|
||||
Names: []string{"v1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// Corresponds to the rls unit test case in
|
||||
// balancer/rls/internal/config_test.go.
|
||||
var rlsClusterSpecifierConfigWithoutTransformations = testutils.MarshalAny(&grpc_lookup_v1.RouteLookupClusterSpecifier{
|
||||
RouteLookupConfig: &grpc_lookup_v1.RouteLookupConfig{
|
||||
GrpcKeybuilders: []*grpc_lookup_v1.GrpcKeyBuilder{
|
||||
{
|
||||
Names: []*grpc_lookup_v1.GrpcKeyBuilder_Name{
|
||||
{
|
||||
Service: "service",
|
||||
Method: "method",
|
||||
},
|
||||
},
|
||||
Headers: []*grpc_lookup_v1.NameMatcher{
|
||||
{
|
||||
Key: "k1",
|
||||
Names: []string{"v1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
LookupService: "target",
|
||||
LookupServiceTimeout: &durationpb.Duration{Seconds: 100},
|
||||
MaxAge: &durationpb.Duration{Seconds: 60},
|
||||
StaleAge: &durationpb.Duration{Seconds: 50},
|
||||
CacheSizeBytes: 1000,
|
||||
DefaultTarget: "passthrough:///default",
|
||||
},
|
||||
})
|
||||
|
||||
var configWithoutTransformationsWant = clusterspecifier.BalancerConfig{{"rls_experimental": &lbConfigJSON{
|
||||
RouteLookupConfig: []byte(`{"grpcKeybuilders":[{"names":[{"service":"service","method":"method"}],"headers":[{"key":"k1","names":["v1"]}]}],"lookupService":"target","lookupServiceTimeout":"100s","maxAge":"60s","staleAge":"50s","cacheSizeBytes":"1000","defaultTarget":"passthrough:///default"}`),
|
||||
ChildPolicy: []map[string]json.RawMessage{
|
||||
{
|
||||
"cds_experimental": []byte(`{}`),
|
||||
},
|
||||
},
|
||||
ChildPolicyConfigTargetFieldName: "cluster",
|
||||
}}}
|
||||
Loading…
Reference in New Issue