mirror of https://github.com/grpc/grpc-go.git
xds/federation: support federation in LRS (#5128)
This commit is contained in:
parent
61a6a06b88
commit
0a68f8aff0
|
@ -318,12 +318,20 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
|
|||
EDSServiceName: cu.EDSServiceName,
|
||||
MaxConcurrentRequests: cu.MaxRequests,
|
||||
}
|
||||
if cu.EnableLRS {
|
||||
// An empty string here indicates that the cluster_resolver balancer should use the
|
||||
// same xDS server for load reporting as it does for EDS
|
||||
// requests/responses.
|
||||
dms[i].LoadReportingServerName = new(string)
|
||||
|
||||
if cu.LRSServerConfig == xdsresource.ClusterLRSServerSelf {
|
||||
bootstrapConfig := b.xdsClient.BootstrapConfig()
|
||||
parsedName := xdsresource.ParseName(cu.ClusterName)
|
||||
if parsedName.Scheme == xdsresource.FederationScheme {
|
||||
// Is a federation resource name, find the corresponding
|
||||
// authority server config.
|
||||
if cfg, ok := bootstrapConfig.Authorities[parsedName.Authority]; ok {
|
||||
dms[i].LoadReportingServer = cfg.XDSServer
|
||||
}
|
||||
} else {
|
||||
// Not a federation resource name, use the default
|
||||
// authority.
|
||||
dms[i].LoadReportingServer = bootstrapConfig.XDSServer
|
||||
}
|
||||
}
|
||||
case xdsresource.ClusterTypeLogicalDNS:
|
||||
dms[i] = clusterresolver.DiscoveryMechanism{
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
"google.golang.org/grpc/xds/internal/balancer/ringhash"
|
||||
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
|
||||
)
|
||||
|
||||
|
@ -48,6 +49,11 @@ const (
|
|||
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
|
||||
)
|
||||
|
||||
var defaultTestAuthorityServerConfig = &bootstrap.ServerConfig{
|
||||
ServerURI: "self_server",
|
||||
CredsType: "self_creds",
|
||||
}
|
||||
|
||||
type s struct {
|
||||
grpctest.Tester
|
||||
}
|
||||
|
@ -209,8 +215,7 @@ func edsCCS(service string, countMax *uint32, enableLRS bool, xdslbpolicy *inter
|
|||
MaxConcurrentRequests: countMax,
|
||||
}
|
||||
if enableLRS {
|
||||
discoveryMechanism.LoadReportingServerName = new(string)
|
||||
|
||||
discoveryMechanism.LoadReportingServer = defaultTestAuthorityServerConfig
|
||||
}
|
||||
lbCfg := &clusterresolver.LBConfig{
|
||||
DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{discoveryMechanism},
|
||||
|
@ -354,6 +359,9 @@ func (s) TestUpdateClientConnStateWithSameState(t *testing.T) {
|
|||
// to the edsBalancer.
|
||||
func (s) TestHandleClusterUpdate(t *testing.T) {
|
||||
xdsC, cdsB, edsB, _, cancel := setupWithWatch(t)
|
||||
xdsC.SetBootstrapConfig(&bootstrap.Config{
|
||||
XDSServer: defaultTestAuthorityServerConfig,
|
||||
})
|
||||
defer func() {
|
||||
cancel()
|
||||
cdsB.Close()
|
||||
|
@ -367,7 +375,7 @@ func (s) TestHandleClusterUpdate(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
name: "happy-case-with-lrs",
|
||||
cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, EnableLRS: true},
|
||||
cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, LRSServerConfig: xdsresource.ClusterLRSServerSelf},
|
||||
wantCCS: edsCCS(serviceName, nil, true, nil),
|
||||
},
|
||||
{
|
||||
|
|
|
@ -41,6 +41,7 @@ import (
|
|||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/load"
|
||||
)
|
||||
|
||||
|
@ -48,15 +49,18 @@ const (
|
|||
defaultTestTimeout = 1 * time.Second
|
||||
defaultShortTestTimeout = 100 * time.Microsecond
|
||||
|
||||
testClusterName = "test-cluster"
|
||||
testServiceName = "test-eds-service"
|
||||
testLRSServerName = "test-lrs-name"
|
||||
testClusterName = "test-cluster"
|
||||
testServiceName = "test-eds-service"
|
||||
)
|
||||
|
||||
var (
|
||||
testBackendAddrs = []resolver.Address{
|
||||
{Addr: "1.1.1.1:1"},
|
||||
}
|
||||
testLRSServerConfig = &bootstrap.ServerConfig{
|
||||
ServerURI: "trafficdirector.googleapis.com:443",
|
||||
CredsType: "google_default",
|
||||
}
|
||||
|
||||
cmpOpts = cmp.Options{
|
||||
cmpopts.EquateEmpty(),
|
||||
|
@ -103,9 +107,9 @@ func (s) TestDropByCategory(t *testing.T) {
|
|||
if err := b.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
|
||||
BalancerConfig: &LBConfig{
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServerName: newString(testLRSServerName),
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
DropCategories: []DropConfig{{
|
||||
Category: dropReason,
|
||||
RequestsPerMillion: million * dropNumerator / dropDenominator,
|
||||
|
@ -125,8 +129,8 @@ func (s) TestDropByCategory(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
|
||||
}
|
||||
if got.Server != testLRSServerName {
|
||||
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
|
||||
if got.Server != testLRSServerConfig {
|
||||
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
|
||||
}
|
||||
|
||||
sc1 := <-cc.NewSubConnCh
|
||||
|
@ -191,9 +195,9 @@ func (s) TestDropByCategory(t *testing.T) {
|
|||
if err := b.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
|
||||
BalancerConfig: &LBConfig{
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServerName: newString(testLRSServerName),
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
DropCategories: []DropConfig{{
|
||||
Category: dropReason2,
|
||||
RequestsPerMillion: million * dropNumerator2 / dropDenominator2,
|
||||
|
@ -257,10 +261,10 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
|
|||
if err := b.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
|
||||
BalancerConfig: &LBConfig{
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServerName: newString(testLRSServerName),
|
||||
MaxConcurrentRequests: &maxRequest,
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: &maxRequest,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: roundrobin.Name,
|
||||
},
|
||||
|
@ -276,8 +280,8 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
|
||||
}
|
||||
if got.Server != testLRSServerName {
|
||||
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
|
||||
if got.Server != testLRSServerConfig {
|
||||
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
|
||||
}
|
||||
|
||||
sc1 := <-cc.NewSubConnCh
|
||||
|
@ -605,9 +609,9 @@ func (s) TestLoadReporting(t *testing.T) {
|
|||
if err := b.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
|
||||
BalancerConfig: &LBConfig{
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServerName: newString(testLRSServerName),
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
// Locality: testLocality,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: roundrobin.Name,
|
||||
|
@ -624,8 +628,8 @@ func (s) TestLoadReporting(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
|
||||
}
|
||||
if got.Server != testLRSServerName {
|
||||
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
|
||||
if got.Server != testLRSServerConfig {
|
||||
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
|
||||
}
|
||||
|
||||
sc1 := <-cc.NewSubConnCh
|
||||
|
@ -720,9 +724,9 @@ func (s) TestUpdateLRSServer(t *testing.T) {
|
|||
if err := b.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
|
||||
BalancerConfig: &LBConfig{
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServerName: newString(""),
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: roundrobin.Name,
|
||||
},
|
||||
|
@ -738,17 +742,21 @@ func (s) TestUpdateLRSServer(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
|
||||
}
|
||||
if got.Server != "" {
|
||||
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, "")
|
||||
if got.Server != testLRSServerConfig {
|
||||
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
|
||||
}
|
||||
|
||||
testLRSServerConfig2 := &bootstrap.ServerConfig{
|
||||
ServerURI: "trafficdirector-another.googleapis.com:443",
|
||||
CredsType: "google_default",
|
||||
}
|
||||
// Update LRS server to a different name.
|
||||
if err := b.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
|
||||
BalancerConfig: &LBConfig{
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServerName: newString(testLRSServerName),
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServer: testLRSServerConfig2,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: roundrobin.Name,
|
||||
},
|
||||
|
@ -763,17 +771,16 @@ func (s) TestUpdateLRSServer(t *testing.T) {
|
|||
if err2 != nil {
|
||||
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2)
|
||||
}
|
||||
if got2.Server != testLRSServerName {
|
||||
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerName)
|
||||
if got2.Server != testLRSServerConfig2 {
|
||||
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerConfig2)
|
||||
}
|
||||
|
||||
// Update LRS server to nil, to disable LRS.
|
||||
if err := b.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
|
||||
BalancerConfig: &LBConfig{
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
LoadReportingServerName: nil,
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testServiceName,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: roundrobin.Name,
|
||||
},
|
||||
|
|
|
@ -41,6 +41,7 @@ import (
|
|||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
"google.golang.org/grpc/xds/internal/balancer/loadstore"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/load"
|
||||
)
|
||||
|
||||
|
@ -104,7 +105,7 @@ type clusterImplBalancer struct {
|
|||
childLB balancer.Balancer
|
||||
cancelLoadReport func()
|
||||
edsServiceName string
|
||||
lrsServerName *string
|
||||
lrsServer *bootstrap.ServerConfig
|
||||
loadWrapper *loadstore.Wrapper
|
||||
|
||||
clusterNameMu sync.Mutex
|
||||
|
@ -171,22 +172,22 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
|
|||
)
|
||||
|
||||
// Check if it's necessary to restart load report.
|
||||
if b.lrsServerName == nil {
|
||||
if newConfig.LoadReportingServerName != nil {
|
||||
if b.lrsServer == nil {
|
||||
if newConfig.LoadReportingServer != nil {
|
||||
// Old is nil, new is not nil, start new LRS.
|
||||
b.lrsServerName = newConfig.LoadReportingServerName
|
||||
b.lrsServer = newConfig.LoadReportingServer
|
||||
startNewLoadReport = true
|
||||
}
|
||||
// Old is nil, new is nil, do nothing.
|
||||
} else if newConfig.LoadReportingServerName == nil {
|
||||
} else if newConfig.LoadReportingServer == nil {
|
||||
// Old is not nil, new is nil, stop old, don't start new.
|
||||
b.lrsServerName = newConfig.LoadReportingServerName
|
||||
b.lrsServer = newConfig.LoadReportingServer
|
||||
stopOldLoadReport = true
|
||||
} else {
|
||||
// Old is not nil, new is not nil, compare string values, if
|
||||
// different, stop old and start new.
|
||||
if *b.lrsServerName != *newConfig.LoadReportingServerName {
|
||||
b.lrsServerName = newConfig.LoadReportingServerName
|
||||
if *b.lrsServer != *newConfig.LoadReportingServer {
|
||||
b.lrsServer = newConfig.LoadReportingServer
|
||||
stopOldLoadReport = true
|
||||
startNewLoadReport = true
|
||||
}
|
||||
|
@ -206,7 +207,7 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
|
|||
if startNewLoadReport {
|
||||
var loadStore *load.Store
|
||||
if b.xdsClient != nil {
|
||||
loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(*b.lrsServerName)
|
||||
loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(b.lrsServer)
|
||||
}
|
||||
b.loadWrapper.UpdateLoadStore(loadStore)
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
|
||||
)
|
||||
|
||||
// DropConfig contains the category, and drop ratio.
|
||||
|
@ -35,12 +36,14 @@ type DropConfig struct {
|
|||
type LBConfig struct {
|
||||
serviceconfig.LoadBalancingConfig `json:"-"`
|
||||
|
||||
Cluster string `json:"cluster,omitempty"`
|
||||
EDSServiceName string `json:"edsServiceName,omitempty"`
|
||||
LoadReportingServerName *string `json:"lrsLoadReportingServerName,omitempty"`
|
||||
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
|
||||
DropCategories []DropConfig `json:"dropCategories,omitempty"`
|
||||
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
|
||||
Cluster string `json:"cluster,omitempty"`
|
||||
EDSServiceName string `json:"edsServiceName,omitempty"`
|
||||
// LoadReportingServer is the LRS server to send load reports to. If not
|
||||
// present, load reporting will be disabled.
|
||||
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
|
||||
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
|
||||
DropCategories []DropConfig `json:"dropCategories,omitempty"`
|
||||
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
|
||||
}
|
||||
|
||||
func parseConfig(c json.RawMessage) (*LBConfig, error) {
|
||||
|
|
|
@ -22,17 +22,22 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"google.golang.org/grpc/balancer"
|
||||
_ "google.golang.org/grpc/balancer/roundrobin"
|
||||
_ "google.golang.org/grpc/balancer/weightedtarget"
|
||||
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
|
||||
)
|
||||
|
||||
const (
|
||||
testJSONConfig = `{
|
||||
"cluster": "test_cluster",
|
||||
"edsServiceName": "test-eds",
|
||||
"lrsLoadReportingServerName": "lrs_server",
|
||||
"lrsLoadReportingServer": {
|
||||
"server_uri": "trafficdirector.googleapis.com:443",
|
||||
"channel_creds": [ { "type": "google_default" } ]
|
||||
},
|
||||
"maxConcurrentRequests": 123,
|
||||
"dropCategories": [
|
||||
{
|
||||
|
@ -106,10 +111,10 @@ func TestParseConfig(t *testing.T) {
|
|||
name: "OK",
|
||||
js: testJSONConfig,
|
||||
want: &LBConfig{
|
||||
Cluster: "test_cluster",
|
||||
EDSServiceName: "test-eds",
|
||||
LoadReportingServerName: newString("lrs_server"),
|
||||
MaxConcurrentRequests: newUint32(123),
|
||||
Cluster: "test_cluster",
|
||||
EDSServiceName: "test-eds",
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: newUint32(123),
|
||||
DropCategories: []DropConfig{
|
||||
{Category: "drop-1", RequestsPerMillion: 314},
|
||||
{Category: "drop-2", RequestsPerMillion: 159},
|
||||
|
@ -128,17 +133,13 @@ func TestParseConfig(t *testing.T) {
|
|||
if (err != nil) != tt.wantErr {
|
||||
t.Fatalf("parseConfig() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
if !cmp.Equal(got, tt.want) {
|
||||
if !cmp.Equal(got, tt.want, cmpopts.IgnoreFields(bootstrap.ServerConfig{}, "Creds")) {
|
||||
t.Errorf("parseConfig() got unexpected result, diff: %v", cmp.Diff(got, tt.want))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newString(s string) *string {
|
||||
return &s
|
||||
}
|
||||
|
||||
func newUint32(i uint32) *uint32 {
|
||||
return &i
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
"google.golang.org/grpc/xds/internal/balancer/ringhash"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
|
||||
)
|
||||
|
||||
// DiscoveryMechanismType is the type of discovery mechanism.
|
||||
|
@ -84,11 +85,9 @@ func (t *DiscoveryMechanismType) UnmarshalJSON(b []byte) error {
|
|||
type DiscoveryMechanism struct {
|
||||
// Cluster is the cluster name.
|
||||
Cluster string `json:"cluster,omitempty"`
|
||||
// LoadReportingServerName is the LRS server to send load reports to. If
|
||||
// not present, load reporting will be disabled. If set to the empty string,
|
||||
// load reporting will be sent to the same server that we obtained CDS data
|
||||
// from.
|
||||
LoadReportingServerName *string `json:"lrsLoadReportingServerName,omitempty"`
|
||||
// LoadReportingServer is the LRS server to send load reports to. If not
|
||||
// present, load reporting will be disabled.
|
||||
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
|
||||
// MaxConcurrentRequests is the maximum number of outstanding requests can
|
||||
// be made to the upstream cluster. Default is 1024.
|
||||
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
|
||||
|
@ -110,8 +109,6 @@ func (dm DiscoveryMechanism) Equal(b DiscoveryMechanism) bool {
|
|||
switch {
|
||||
case dm.Cluster != b.Cluster:
|
||||
return false
|
||||
case !equalStringP(dm.LoadReportingServerName, b.LoadReportingServerName):
|
||||
return false
|
||||
case !equalUint32P(dm.MaxConcurrentRequests, b.MaxConcurrentRequests):
|
||||
return false
|
||||
case dm.Type != b.Type:
|
||||
|
@ -121,17 +118,14 @@ func (dm DiscoveryMechanism) Equal(b DiscoveryMechanism) bool {
|
|||
case dm.DNSHostname != b.DNSHostname:
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func equalStringP(a, b *string) bool {
|
||||
if a == nil && b == nil {
|
||||
if dm.LoadReportingServer == nil && b.LoadReportingServer == nil {
|
||||
return true
|
||||
}
|
||||
if a == nil || b == nil {
|
||||
if (dm.LoadReportingServer != nil) != (b.LoadReportingServer != nil) {
|
||||
return false
|
||||
}
|
||||
return *a == *b
|
||||
return dm.LoadReportingServer.String() == b.LoadReportingServer.String()
|
||||
}
|
||||
|
||||
func equalUint32P(a, b *uint32) bool {
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"google.golang.org/grpc/internal/balancer/stub"
|
||||
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
|
||||
"google.golang.org/grpc/xds/internal/balancer/ringhash"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
|
||||
)
|
||||
|
||||
func TestDiscoveryMechanismTypeMarshalJSON(t *testing.T) {
|
||||
|
@ -102,7 +103,10 @@ const (
|
|||
testJSONConfig1 = `{
|
||||
"discoveryMechanisms": [{
|
||||
"cluster": "test-cluster-name",
|
||||
"lrsLoadReportingServerName": "test-lrs-server",
|
||||
"lrsLoadReportingServer": {
|
||||
"server_uri": "trafficdirector.googleapis.com:443",
|
||||
"channel_creds": [ { "type": "google_default" } ]
|
||||
},
|
||||
"maxConcurrentRequests": 314,
|
||||
"type": "EDS",
|
||||
"edsServiceName": "test-eds-service-name"
|
||||
|
@ -111,7 +115,10 @@ const (
|
|||
testJSONConfig2 = `{
|
||||
"discoveryMechanisms": [{
|
||||
"cluster": "test-cluster-name",
|
||||
"lrsLoadReportingServerName": "test-lrs-server",
|
||||
"lrsLoadReportingServer": {
|
||||
"server_uri": "trafficdirector.googleapis.com:443",
|
||||
"channel_creds": [ { "type": "google_default" } ]
|
||||
},
|
||||
"maxConcurrentRequests": 314,
|
||||
"type": "EDS",
|
||||
"edsServiceName": "test-eds-service-name"
|
||||
|
@ -122,7 +129,10 @@ const (
|
|||
testJSONConfig3 = `{
|
||||
"discoveryMechanisms": [{
|
||||
"cluster": "test-cluster-name",
|
||||
"lrsLoadReportingServerName": "test-lrs-server",
|
||||
"lrsLoadReportingServer": {
|
||||
"server_uri": "trafficdirector.googleapis.com:443",
|
||||
"channel_creds": [ { "type": "google_default" } ]
|
||||
},
|
||||
"maxConcurrentRequests": 314,
|
||||
"type": "EDS",
|
||||
"edsServiceName": "test-eds-service-name"
|
||||
|
@ -132,7 +142,10 @@ const (
|
|||
testJSONConfig4 = `{
|
||||
"discoveryMechanisms": [{
|
||||
"cluster": "test-cluster-name",
|
||||
"lrsLoadReportingServerName": "test-lrs-server",
|
||||
"lrsLoadReportingServer": {
|
||||
"server_uri": "trafficdirector.googleapis.com:443",
|
||||
"channel_creds": [ { "type": "google_default" } ]
|
||||
},
|
||||
"maxConcurrentRequests": 314,
|
||||
"type": "EDS",
|
||||
"edsServiceName": "test-eds-service-name"
|
||||
|
@ -142,7 +155,10 @@ const (
|
|||
testJSONConfig5 = `{
|
||||
"discoveryMechanisms": [{
|
||||
"cluster": "test-cluster-name",
|
||||
"lrsLoadReportingServerName": "test-lrs-server",
|
||||
"lrsLoadReportingServer": {
|
||||
"server_uri": "trafficdirector.googleapis.com:443",
|
||||
"channel_creds": [ { "type": "google_default" } ]
|
||||
},
|
||||
"maxConcurrentRequests": 314,
|
||||
"type": "EDS",
|
||||
"edsServiceName": "test-eds-service-name"
|
||||
|
@ -151,6 +167,11 @@ const (
|
|||
}`
|
||||
)
|
||||
|
||||
var testLRSServerConfig = &bootstrap.ServerConfig{
|
||||
ServerURI: "trafficdirector.googleapis.com:443",
|
||||
CredsType: "google_default",
|
||||
}
|
||||
|
||||
func TestParseConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -170,11 +191,11 @@ func TestParseConfig(t *testing.T) {
|
|||
want: &LBConfig{
|
||||
DiscoveryMechanisms: []DiscoveryMechanism{
|
||||
{
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServcie,
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServcie,
|
||||
},
|
||||
},
|
||||
XDSLBPolicy: nil,
|
||||
|
@ -187,11 +208,11 @@ func TestParseConfig(t *testing.T) {
|
|||
want: &LBConfig{
|
||||
DiscoveryMechanisms: []DiscoveryMechanism{
|
||||
{
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServcie,
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServcie,
|
||||
},
|
||||
{
|
||||
Type: DiscoveryMechanismTypeLogicalDNS,
|
||||
|
@ -207,11 +228,11 @@ func TestParseConfig(t *testing.T) {
|
|||
want: &LBConfig{
|
||||
DiscoveryMechanisms: []DiscoveryMechanism{
|
||||
{
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServcie,
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServcie,
|
||||
},
|
||||
},
|
||||
XDSLBPolicy: &internalserviceconfig.BalancerConfig{
|
||||
|
@ -227,11 +248,11 @@ func TestParseConfig(t *testing.T) {
|
|||
want: &LBConfig{
|
||||
DiscoveryMechanisms: []DiscoveryMechanism{
|
||||
{
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServcie,
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServcie,
|
||||
},
|
||||
},
|
||||
XDSLBPolicy: &internalserviceconfig.BalancerConfig{
|
||||
|
|
|
@ -257,11 +257,11 @@ var rrBalancerConfig = &internalserviceconfig.BalancerConfig{Name: roundrobin.Na
|
|||
// addresses.
|
||||
func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) {
|
||||
clusterImplCfg := &clusterimpl.LBConfig{
|
||||
Cluster: mechanism.Cluster,
|
||||
EDSServiceName: mechanism.EDSServiceName,
|
||||
LoadReportingServerName: mechanism.LoadReportingServerName,
|
||||
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
|
||||
DropCategories: drops,
|
||||
Cluster: mechanism.Cluster,
|
||||
EDSServiceName: mechanism.EDSServiceName,
|
||||
LoadReportingServer: mechanism.LoadReportingServer,
|
||||
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
|
||||
DropCategories: drops,
|
||||
// ChildPolicy is not set. Will be set based on xdsLBPolicy
|
||||
}
|
||||
|
||||
|
|
|
@ -125,11 +125,11 @@ func TestBuildPriorityConfigJSON(t *testing.T) {
|
|||
gotConfig, _, err := buildPriorityConfigJSON([]priorityConfig{
|
||||
{
|
||||
mechanism: DiscoveryMechanism{
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
},
|
||||
edsResp: xdsresource.EndpointsUpdate{
|
||||
Drops: []xdsresource.OverloadDropConfig{
|
||||
|
@ -175,11 +175,11 @@ func TestBuildPriorityConfig(t *testing.T) {
|
|||
gotConfig, gotAddrs, _ := buildPriorityConfig([]priorityConfig{
|
||||
{
|
||||
mechanism: DiscoveryMechanism{
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
},
|
||||
edsResp: xdsresource.EndpointsUpdate{
|
||||
Drops: []xdsresource.OverloadDropConfig{
|
||||
|
@ -212,10 +212,10 @@ func TestBuildPriorityConfig(t *testing.T) {
|
|||
Config: &internalserviceconfig.BalancerConfig{
|
||||
Name: clusterimpl.Name,
|
||||
Config: &clusterimpl.LBConfig{
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
DropCategories: []clusterimpl.DropConfig{
|
||||
{
|
||||
Category: testDropCategory,
|
||||
|
@ -245,10 +245,10 @@ func TestBuildPriorityConfig(t *testing.T) {
|
|||
Config: &internalserviceconfig.BalancerConfig{
|
||||
Name: clusterimpl.Name,
|
||||
Config: &clusterimpl.LBConfig{
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
DropCategories: []clusterimpl.DropConfig{
|
||||
{
|
||||
Category: testDropCategory,
|
||||
|
@ -369,11 +369,11 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) {
|
|||
},
|
||||
},
|
||||
DiscoveryMechanism{
|
||||
Cluster: testClusterName,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
Cluster: testClusterName,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
@ -384,10 +384,10 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) {
|
|||
}
|
||||
wantConfigs := map[string]*clusterimpl.LBConfig{
|
||||
"priority-2-0": {
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
DropCategories: []clusterimpl.DropConfig{
|
||||
{
|
||||
Category: testDropCategory,
|
||||
|
@ -411,10 +411,10 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"priority-2-1": {
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServer: testLRSServerConfig,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
DropCategories: []clusterimpl.DropConfig{
|
||||
{
|
||||
Category: testDropCategory,
|
||||
|
|
|
@ -242,11 +242,11 @@ func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) (string, error) {
|
|||
// ReportLoadArgs wraps the arguments passed to ReportLoad.
|
||||
type ReportLoadArgs struct {
|
||||
// Server is the name of the server to which the load is reported.
|
||||
Server string
|
||||
Server *bootstrap.ServerConfig
|
||||
}
|
||||
|
||||
// ReportLoad starts reporting load about clusterName to server.
|
||||
func (xdsC *Client) ReportLoad(server string) (loadStore *load.Store, cancel func()) {
|
||||
func (xdsC *Client) ReportLoad(server *bootstrap.ServerConfig) (loadStore *load.Store, cancel func()) {
|
||||
xdsC.loadReportCh.Send(ReportLoadArgs{Server: server})
|
||||
return xdsC.loadStore, func() {
|
||||
xdsC.lrsCancelCh.Send(nil)
|
||||
|
|
|
@ -36,7 +36,7 @@ type XDSClient interface {
|
|||
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
|
||||
WatchCluster(string, func(xdsresource.ClusterUpdate, error)) func()
|
||||
WatchEndpoints(clusterName string, edsCb func(xdsresource.EndpointsUpdate, error)) (cancel func())
|
||||
ReportLoad(server string) (*load.Store, func())
|
||||
ReportLoad(server *bootstrap.ServerConfig) (*load.Store, func())
|
||||
|
||||
DumpLDS() map[string]xdsresource.UpdateWithMD
|
||||
DumpRDS() map[string]xdsresource.UpdateWithMD
|
||||
|
|
|
@ -27,8 +27,6 @@ import (
|
|||
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
|
||||
)
|
||||
|
||||
const federationScheme = "xdstp"
|
||||
|
||||
// findAuthority returns the authority for this name. If it doesn't already
|
||||
// exist, one will be created.
|
||||
//
|
||||
|
@ -49,7 +47,7 @@ func (c *clientImpl) findAuthority(n *xdsresource.Name) (_ *authority, unref fun
|
|||
}
|
||||
|
||||
config := c.config.XDSServer
|
||||
if scheme == federationScheme {
|
||||
if scheme == xdsresource.FederationScheme {
|
||||
cfg, ok := c.config.Authorities[authority]
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("xds: failed to find authority %q", authority)
|
||||
|
@ -78,6 +76,9 @@ func (c *clientImpl) findAuthority(n *xdsresource.Name) (_ *authority, unref fun
|
|||
// newAuthority creates a new authority for the config. But before that, it
|
||||
// checks the cache to see if an authority for this config already exists.
|
||||
//
|
||||
// The caller must take a reference of the returned authority before using, and
|
||||
// unref afterwards.
|
||||
//
|
||||
// caller must hold c.authorityMu
|
||||
func (c *clientImpl) newAuthority(config *bootstrap.ServerConfig) (_ *authority, retErr error) {
|
||||
// First check if there's already an authority for this config. If found, it
|
||||
|
@ -219,8 +220,12 @@ func (a *authority) watchEndpoints(clusterName string, cb func(xdsresource.Endpo
|
|||
}
|
||||
}
|
||||
|
||||
func (a *authority) reportLoad(server string) (*load.Store, func()) {
|
||||
return a.controller.ReportLoad(server)
|
||||
func (a *authority) reportLoad() (*load.Store, func()) {
|
||||
// An empty string means to report load to the same same used for ADS. There
|
||||
// should never be a need to specify a string other than an empty string. If
|
||||
// a different server is to be used, a different authority (controller) will
|
||||
// be created.
|
||||
return a.controller.ReportLoad("")
|
||||
}
|
||||
|
||||
func (a *authority) dump(t xdsresource.ResourceType) map[string]xdsresource.UpdateWithMD {
|
||||
|
|
|
@ -107,19 +107,26 @@ func (sc *ServerConfig) String() string {
|
|||
return strings.Join([]string{sc.ServerURI, sc.CredsType, ver}, "-")
|
||||
}
|
||||
|
||||
// UnmarshalJSON takes the json data (a list of servers) and unmarshals the
|
||||
// first one in the list.
|
||||
// MarshalJSON marshals the ServerConfig to json.
|
||||
func (sc ServerConfig) MarshalJSON() ([]byte, error) {
|
||||
server := xdsServer{
|
||||
ServerURI: sc.ServerURI,
|
||||
ChannelCreds: []channelCreds{{Type: sc.CredsType, Config: nil}},
|
||||
}
|
||||
if sc.TransportAPI == version.TransportV3 {
|
||||
server.ServerFeatures = []string{serverFeaturesV3}
|
||||
}
|
||||
return json.Marshal(server)
|
||||
}
|
||||
|
||||
// UnmarshalJSON takes the json data (a server) and unmarshals it to the struct.
|
||||
func (sc *ServerConfig) UnmarshalJSON(data []byte) error {
|
||||
var servers []*xdsServer
|
||||
if err := json.Unmarshal(data, &servers); err != nil {
|
||||
return fmt.Errorf("xds: json.Unmarshal(data) for field xds_servers failed during bootstrap: %v", err)
|
||||
var server xdsServer
|
||||
if err := json.Unmarshal(data, &server); err != nil {
|
||||
return fmt.Errorf("xds: json.Unmarshal(data) for field ServerConfig failed during bootstrap: %v", err)
|
||||
}
|
||||
if len(servers) < 1 {
|
||||
return fmt.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any management server to connect to")
|
||||
}
|
||||
xs := servers[0]
|
||||
sc.ServerURI = xs.ServerURI
|
||||
for _, cc := range xs.ChannelCreds {
|
||||
sc.ServerURI = server.ServerURI
|
||||
for _, cc := range server.ChannelCreds {
|
||||
// We stop at the first credential type that we support.
|
||||
sc.CredsType = cc.Type
|
||||
if cc.Type == credsGoogleDefault {
|
||||
|
@ -130,7 +137,7 @@ func (sc *ServerConfig) UnmarshalJSON(data []byte) error {
|
|||
break
|
||||
}
|
||||
}
|
||||
for _, f := range xs.ServerFeatures {
|
||||
for _, f := range server.ServerFeatures {
|
||||
if f == serverFeaturesV3 {
|
||||
sc.TransportAPI = version.TransportV3
|
||||
}
|
||||
|
@ -138,6 +145,18 @@ func (sc *ServerConfig) UnmarshalJSON(data []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// unmarshalJSONServerConfigSlice unmarshals JSON to a slice.
|
||||
func unmarshalJSONServerConfigSlice(data []byte) ([]*ServerConfig, error) {
|
||||
var servers []*ServerConfig
|
||||
if err := json.Unmarshal(data, &servers); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal JSON to []*ServerConfig: %v", err)
|
||||
}
|
||||
if len(servers) < 1 {
|
||||
return nil, fmt.Errorf("no management server found in JSON")
|
||||
}
|
||||
return servers, nil
|
||||
}
|
||||
|
||||
// Authority contains configuration for an Authority for an xDS control plane
|
||||
// server. See the Authorities field in the Config struct for how it's used.
|
||||
type Authority struct {
|
||||
|
@ -170,9 +189,11 @@ func (a *Authority) UnmarshalJSON(data []byte) error {
|
|||
for k, v := range jsonData {
|
||||
switch k {
|
||||
case "xds_servers":
|
||||
if err := json.Unmarshal(v, &a.XDSServer); err != nil {
|
||||
return fmt.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err)
|
||||
servers, err := unmarshalJSONServerConfigSlice(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("xds: json.Unmarshal(data) for field %q failed during bootstrap: %v", k, err)
|
||||
}
|
||||
a.XDSServer = servers[0]
|
||||
case "client_listener_resource_name_template":
|
||||
if err := json.Unmarshal(v, &a.ClientListenerResourceNameTemplate); err != nil {
|
||||
return fmt.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err)
|
||||
|
@ -243,7 +264,7 @@ type Config struct {
|
|||
|
||||
type channelCreds struct {
|
||||
Type string `json:"type"`
|
||||
Config json.RawMessage `json:"config"`
|
||||
Config json.RawMessage `json:"config,omitempty"`
|
||||
}
|
||||
|
||||
type xdsServer struct {
|
||||
|
@ -325,9 +346,11 @@ func NewConfigFromContents(data []byte) (*Config, error) {
|
|||
return nil, fmt.Errorf("xds: jsonpb.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err)
|
||||
}
|
||||
case "xds_servers":
|
||||
if err := json.Unmarshal(v, &config.XDSServer); err != nil {
|
||||
return nil, fmt.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err)
|
||||
servers, err := unmarshalJSONServerConfigSlice(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("xds: json.Unmarshal(data) for field %q failed during bootstrap: %v", k, err)
|
||||
}
|
||||
config.XDSServer = servers[0]
|
||||
case "certificate_providers":
|
||||
var providerInstances map[string]json.RawMessage
|
||||
if err := json.Unmarshal(v, &providerInstances); err != nil {
|
||||
|
|
|
@ -994,3 +994,24 @@ func TestNewConfigWithFederation(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestServerConfigMarshalAndUnmarshal(t *testing.T) {
|
||||
c := ServerConfig{
|
||||
ServerURI: "test-server",
|
||||
Creds: nil,
|
||||
CredsType: "test-creds",
|
||||
TransportAPI: version.TransportV3,
|
||||
}
|
||||
|
||||
bs, err := json.Marshal(c)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to marshal: %v", err)
|
||||
}
|
||||
var cUnmarshal ServerConfig
|
||||
if err := json.Unmarshal(bs, &cUnmarshal); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if diff := cmp.Diff(cUnmarshal, c); diff != "" {
|
||||
t.Fatalf("diff (-got +want): %v", diff)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,9 +35,10 @@ import (
|
|||
// It returns a Store for the user to report loads, a function to cancel the
|
||||
// load reporting stream.
|
||||
//
|
||||
// TODO: LRS refactor; maybe a new controller should be created for a separate
|
||||
// server, so that the same stream can be shared by different reporters to the
|
||||
// same server, even if they originate from different Controllers.
|
||||
// TODO(xdsfed): LRS refactor, delete the parameter of this function, and
|
||||
// cleanup the multiple LRS ClientConn code. Each controller should have one
|
||||
// ClientConn to the authority it's created for, all LRS streams (and ADS
|
||||
// streams) in this controller should all share that ClientConn.
|
||||
func (c *Controller) ReportLoad(server string) (*load.Store, func()) {
|
||||
c.lrsMu.Lock()
|
||||
defer c.lrsMu.Unlock()
|
||||
|
|
|
@ -138,7 +138,7 @@ func (s) TestCDSHandleResponse(t *testing.T) {
|
|||
cdsResponse: goodCDSResponse1,
|
||||
wantErr: false,
|
||||
wantUpdate: map[string]xdsresource.ClusterUpdateErrTuple{
|
||||
goodClusterName1: {Update: xdsresource.ClusterUpdate{ClusterName: goodClusterName1, EDSServiceName: serviceName1, EnableLRS: true, Raw: marshaledCluster1}},
|
||||
goodClusterName1: {Update: xdsresource.ClusterUpdate{ClusterName: goodClusterName1, EDSServiceName: serviceName1, LRSServerConfig: xdsresource.ClusterLRSServerSelf, Raw: marshaledCluster1}},
|
||||
},
|
||||
wantUpdateMD: xdsresource.UpdateMetadata{
|
||||
Status: xdsresource.ServiceStatusACKed,
|
||||
|
|
|
@ -18,30 +18,26 @@
|
|||
package xdsclient
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/load"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
|
||||
)
|
||||
|
||||
// ReportLoad starts an load reporting stream to the given server. If the server
|
||||
// is not an empty string, and is different from the management server, a new
|
||||
// ClientConn will be created.
|
||||
//
|
||||
// The same options used for creating the Client will be used (including
|
||||
// NodeProto, and dial options if necessary).
|
||||
// ReportLoad starts a load reporting stream to the given server. All load
|
||||
// reports to the same server share the LRS stream.
|
||||
//
|
||||
// It returns a Store for the user to report loads, a function to cancel the
|
||||
// load reporting stream.
|
||||
func (c *clientImpl) ReportLoad(server string) (*load.Store, func()) {
|
||||
// TODO: load reporting with federation also needs find the authority for
|
||||
// this server first, then reports load to it. Currently always report to
|
||||
// the default authority. This is needed to avoid a nil pointer panic.
|
||||
a, unref, err := c.findAuthority(xdsresource.ParseName(""))
|
||||
func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (*load.Store, func()) {
|
||||
a, err := c.newAuthority(server)
|
||||
if err != nil {
|
||||
c.logger.Infof("xds: failed to connect to the control plane to do load reporting for authority %q: %v", server, err)
|
||||
return nil, func() {}
|
||||
}
|
||||
store, cancelF := a.reportLoad(server)
|
||||
// Hold the ref before starting load reporting.
|
||||
a.ref()
|
||||
store, cancelF := a.reportLoad()
|
||||
return store, func() {
|
||||
cancelF()
|
||||
unref()
|
||||
c.unrefAuthority(a)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,7 +67,15 @@ func (s) TestLRSClient(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// Report to the same address should not create new ClientConn.
|
||||
store1, lrsCancel1 := xdsC.ReportLoad(fs.Address)
|
||||
store1, lrsCancel1 := xdsC.ReportLoad(
|
||||
&bootstrap.ServerConfig{
|
||||
ServerURI: fs.Address,
|
||||
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
CredsType: "insecure",
|
||||
TransportAPI: version.TransportV2,
|
||||
NodeProto: &v2corepb.Node{},
|
||||
},
|
||||
)
|
||||
defer lrsCancel1()
|
||||
|
||||
if u, err := fs.NewConnChan.Receive(ctx); err != nil {
|
||||
|
@ -87,7 +95,15 @@ func (s) TestLRSClient(t *testing.T) {
|
|||
defer sCleanup2()
|
||||
|
||||
// Report to a different address should create new ClientConn.
|
||||
store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address)
|
||||
store2, lrsCancel2 := xdsC.ReportLoad(
|
||||
&bootstrap.ServerConfig{
|
||||
ServerURI: fs2.Address,
|
||||
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
CredsType: "insecure",
|
||||
TransportAPI: version.TransportV2,
|
||||
NodeProto: &v2corepb.Node{},
|
||||
},
|
||||
)
|
||||
defer lrsCancel2()
|
||||
if u, err := fs2.NewConnChan.Receive(ctx); err != nil {
|
||||
t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
|
||||
|
|
|
@ -25,6 +25,9 @@ import (
|
|||
"google.golang.org/grpc/internal/envconfig"
|
||||
)
|
||||
|
||||
// FederationScheme is the scheme of a federation resource name.
|
||||
const FederationScheme = "xdstp"
|
||||
|
||||
// Name contains the parsed component of an xDS resource name.
|
||||
//
|
||||
// An xDS resource name is in the format of
|
||||
|
|
|
@ -17,7 +17,9 @@
|
|||
|
||||
package xdsresource
|
||||
|
||||
import "google.golang.org/protobuf/types/known/anypb"
|
||||
import (
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
)
|
||||
|
||||
// ClusterType is the type of cluster from a received CDS response.
|
||||
type ClusterType int
|
||||
|
@ -35,6 +37,18 @@ const (
|
|||
ClusterTypeAggregate
|
||||
)
|
||||
|
||||
// ClusterLRSServerConfigType is the type of LRS server config.
|
||||
type ClusterLRSServerConfigType int
|
||||
|
||||
const (
|
||||
// ClusterLRSOff indicates LRS is off (loads are not reported for this
|
||||
// cluster).
|
||||
ClusterLRSOff ClusterLRSServerConfigType = iota
|
||||
// ClusterLRSServerSelf indicates loads should be reported to the same
|
||||
// server (the authority) where the CDS resp is received from.
|
||||
ClusterLRSServerSelf
|
||||
)
|
||||
|
||||
// ClusterLBPolicyRingHash represents ring_hash lb policy, and also contains its
|
||||
// config.
|
||||
type ClusterLBPolicyRingHash struct {
|
||||
|
@ -51,8 +65,10 @@ type ClusterUpdate struct {
|
|||
// EDSServiceName is an optional name for EDS. If it's not set, the balancer
|
||||
// should watch ClusterName for the EDS resources.
|
||||
EDSServiceName string
|
||||
// EnableLRS indicates whether or not load should be reported through LRS.
|
||||
EnableLRS bool
|
||||
// LRSServerConfig contains the server where the load reports should be sent
|
||||
// to. This can be change to an interface, to support other types, e.g. a
|
||||
// ServerConfig with ServerURI, creds.
|
||||
LRSServerConfig ClusterLRSServerConfigType
|
||||
// SecurityCfg contains security configuration sent by the control plane.
|
||||
SecurityCfg *SecurityConfig
|
||||
// MaxRequests for circuit breaking, if any (otherwise nil).
|
||||
|
|
|
@ -127,12 +127,21 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
|
|||
|
||||
ret := ClusterUpdate{
|
||||
ClusterName: cluster.GetName(),
|
||||
EnableLRS: cluster.GetLrsServer().GetSelf() != nil,
|
||||
SecurityCfg: sc,
|
||||
MaxRequests: circuitBreakersFromCluster(cluster),
|
||||
LBPolicy: lbPolicy,
|
||||
}
|
||||
|
||||
// Note that this is different from the gRFC (gRFC A47 says to include the
|
||||
// full ServerConfig{URL,creds,server feature} here). This information is
|
||||
// not available here, because this function doesn't have access to the
|
||||
// xdsclient bootstrap information now (can be added if necessary). The
|
||||
// ServerConfig will be read and populated by the CDS balancer when
|
||||
// processing this field.
|
||||
if cluster.GetLrsServer().GetSelf() != nil {
|
||||
ret.LRSServerConfig = ClusterLRSServerSelf
|
||||
}
|
||||
|
||||
// Validate and set cluster type from the response.
|
||||
switch {
|
||||
case cluster.GetType() == v3clusterpb.Cluster_EDS:
|
||||
|
|
|
@ -46,7 +46,7 @@ const (
|
|||
serviceName = "service"
|
||||
)
|
||||
|
||||
var emptyUpdate = ClusterUpdate{ClusterName: clusterName, EnableLRS: false}
|
||||
var emptyUpdate = ClusterUpdate{ClusterName: clusterName, LRSServerConfig: ClusterLRSOff}
|
||||
|
||||
func (s) TestValidateCluster_Failure(t *testing.T) {
|
||||
tests := []struct {
|
||||
|
@ -263,7 +263,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
|
|||
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
|
||||
},
|
||||
wantUpdate: ClusterUpdate{
|
||||
ClusterName: clusterName, EnableLRS: false, ClusterType: ClusterTypeAggregate,
|
||||
ClusterName: clusterName, LRSServerConfig: ClusterLRSOff, ClusterType: ClusterTypeAggregate,
|
||||
PrioritizedClusterNames: []string{"a", "b", "c"},
|
||||
},
|
||||
},
|
||||
|
@ -298,7 +298,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
|
|||
},
|
||||
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
|
||||
},
|
||||
wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: false},
|
||||
wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: ClusterLRSOff},
|
||||
},
|
||||
{
|
||||
name: "happiest-case",
|
||||
|
@ -320,7 +320,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true},
|
||||
wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: ClusterLRSServerSelf},
|
||||
},
|
||||
{
|
||||
name: "happiest-case-with-circuitbreakers",
|
||||
|
@ -354,7 +354,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true, MaxRequests: func() *uint32 { i := uint32(512); return &i }()},
|
||||
wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: ClusterLRSServerSelf, MaxRequests: func() *uint32 { i := uint32(512); return &i }()},
|
||||
},
|
||||
{
|
||||
name: "happiest-case-with-ring-hash-lb-policy-with-default-config",
|
||||
|
@ -377,7 +377,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
|
|||
},
|
||||
},
|
||||
wantUpdate: ClusterUpdate{
|
||||
ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true,
|
||||
ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: ClusterLRSServerSelf,
|
||||
LBPolicy: &ClusterLBPolicyRingHash{MinimumRingSize: defaultRingHashMinSize, MaximumRingSize: defaultRingHashMaxSize},
|
||||
},
|
||||
},
|
||||
|
@ -408,7 +408,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
|
|||
},
|
||||
},
|
||||
wantUpdate: ClusterUpdate{
|
||||
ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true,
|
||||
ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: ClusterLRSServerSelf,
|
||||
LBPolicy: &ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100},
|
||||
},
|
||||
},
|
||||
|
@ -468,9 +468,9 @@ func (s) TestValidateClusterWithSecurityConfig_EnvVarOff(t *testing.T) {
|
|||
},
|
||||
}
|
||||
wantUpdate := ClusterUpdate{
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
EnableLRS: false,
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
LRSServerConfig: ClusterLRSOff,
|
||||
}
|
||||
gotUpdate, err := validateClusterAndConstructClusterUpdate(cluster)
|
||||
if err != nil {
|
||||
|
@ -1082,9 +1082,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
|
|||
},
|
||||
},
|
||||
wantUpdate: ClusterUpdate{
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
EnableLRS: false,
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
LRSServerConfig: ClusterLRSOff,
|
||||
SecurityCfg: &SecurityConfig{
|
||||
RootInstanceName: rootPluginInstance,
|
||||
RootCertName: rootCertName,
|
||||
|
@ -1124,9 +1124,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
|
|||
},
|
||||
},
|
||||
wantUpdate: ClusterUpdate{
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
EnableLRS: false,
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
LRSServerConfig: ClusterLRSOff,
|
||||
SecurityCfg: &SecurityConfig{
|
||||
RootInstanceName: rootPluginInstance,
|
||||
RootCertName: rootCertName,
|
||||
|
@ -1168,9 +1168,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
|
|||
},
|
||||
},
|
||||
wantUpdate: ClusterUpdate{
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
EnableLRS: false,
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
LRSServerConfig: ClusterLRSOff,
|
||||
SecurityCfg: &SecurityConfig{
|
||||
RootInstanceName: rootPluginInstance,
|
||||
RootCertName: rootCertName,
|
||||
|
@ -1216,9 +1216,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
|
|||
},
|
||||
},
|
||||
wantUpdate: ClusterUpdate{
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
EnableLRS: false,
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
LRSServerConfig: ClusterLRSOff,
|
||||
SecurityCfg: &SecurityConfig{
|
||||
RootInstanceName: rootPluginInstance,
|
||||
RootCertName: rootCertName,
|
||||
|
@ -1276,9 +1276,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
|
|||
},
|
||||
},
|
||||
wantUpdate: ClusterUpdate{
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
EnableLRS: false,
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
LRSServerConfig: ClusterLRSOff,
|
||||
SecurityCfg: &SecurityConfig{
|
||||
RootInstanceName: rootPluginInstance,
|
||||
RootCertName: rootCertName,
|
||||
|
@ -1343,9 +1343,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
|
|||
},
|
||||
},
|
||||
wantUpdate: ClusterUpdate{
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
EnableLRS: false,
|
||||
ClusterName: clusterName,
|
||||
EDSServiceName: serviceName,
|
||||
LRSServerConfig: ClusterLRSOff,
|
||||
SecurityCfg: &SecurityConfig{
|
||||
RootInstanceName: rootPluginInstance,
|
||||
RootCertName: rootCertName,
|
||||
|
@ -1489,7 +1489,7 @@ func (s) TestUnmarshalCluster(t *testing.T) {
|
|||
wantUpdate: map[string]ClusterUpdateErrTuple{
|
||||
v2ClusterName: {Update: ClusterUpdate{
|
||||
ClusterName: v2ClusterName,
|
||||
EDSServiceName: v2Service, EnableLRS: true,
|
||||
EDSServiceName: v2Service, LRSServerConfig: ClusterLRSServerSelf,
|
||||
Raw: v2ClusterAny,
|
||||
}},
|
||||
},
|
||||
|
@ -1504,7 +1504,7 @@ func (s) TestUnmarshalCluster(t *testing.T) {
|
|||
wantUpdate: map[string]ClusterUpdateErrTuple{
|
||||
v3ClusterName: {Update: ClusterUpdate{
|
||||
ClusterName: v3ClusterName,
|
||||
EDSServiceName: v3Service, EnableLRS: true,
|
||||
EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf,
|
||||
Raw: v3ClusterAny,
|
||||
}},
|
||||
},
|
||||
|
@ -1519,12 +1519,12 @@ func (s) TestUnmarshalCluster(t *testing.T) {
|
|||
wantUpdate: map[string]ClusterUpdateErrTuple{
|
||||
v2ClusterName: {Update: ClusterUpdate{
|
||||
ClusterName: v2ClusterName,
|
||||
EDSServiceName: v2Service, EnableLRS: true,
|
||||
EDSServiceName: v2Service, LRSServerConfig: ClusterLRSServerSelf,
|
||||
Raw: v2ClusterAny,
|
||||
}},
|
||||
v3ClusterName: {Update: ClusterUpdate{
|
||||
ClusterName: v3ClusterName,
|
||||
EDSServiceName: v3Service, EnableLRS: true,
|
||||
EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf,
|
||||
Raw: v3ClusterAny,
|
||||
}},
|
||||
},
|
||||
|
@ -1548,12 +1548,12 @@ func (s) TestUnmarshalCluster(t *testing.T) {
|
|||
wantUpdate: map[string]ClusterUpdateErrTuple{
|
||||
v2ClusterName: {Update: ClusterUpdate{
|
||||
ClusterName: v2ClusterName,
|
||||
EDSServiceName: v2Service, EnableLRS: true,
|
||||
EDSServiceName: v2Service, LRSServerConfig: ClusterLRSServerSelf,
|
||||
Raw: v2ClusterAny,
|
||||
}},
|
||||
v3ClusterName: {Update: ClusterUpdate{
|
||||
ClusterName: v3ClusterName,
|
||||
EDSServiceName: v3Service, EnableLRS: true,
|
||||
EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf,
|
||||
Raw: v3ClusterAny,
|
||||
}},
|
||||
"bad": {Err: cmpopts.AnyError},
|
||||
|
|
Loading…
Reference in New Issue