mirror of https://github.com/grpc/grpc-go.git
xds: Process telemetry labels from CDS in xDS Balancers (#7116)
This commit is contained in:
parent
a4afd4d995
commit
b37cd8133a
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2024 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
// Package stats provides internal stats related functionality.
|
||||
package stats
|
||||
|
||||
import "context"
|
||||
|
||||
// Labels are the labels for metrics.
|
||||
type Labels struct {
|
||||
// TelemetryLabels are the telemetry labels to record.
|
||||
TelemetryLabels map[string]string
|
||||
}
|
||||
|
||||
type labelsKey struct{}
|
||||
|
||||
// GetLabels returns the Labels stored in the context, or nil if there is one.
|
||||
func GetLabels(ctx context.Context) *Labels {
|
||||
labels, _ := ctx.Value(labelsKey{}).(*Labels)
|
||||
return labels
|
||||
}
|
||||
|
||||
// SetLabels sets the Labels in the context.
|
||||
func SetLabels(ctx context.Context, labels *Labels) context.Context {
|
||||
// could also append
|
||||
return context.WithValue(ctx, labelsKey{}, labels)
|
||||
}
|
||||
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2024 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
package xds_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
istats "google.golang.org/grpc/internal/stats"
|
||||
"google.golang.org/grpc/internal/stubserver"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/internal/testutils/xds/e2e"
|
||||
"google.golang.org/grpc/stats"
|
||||
|
||||
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
||||
testgrpc "google.golang.org/grpc/interop/grpc_testing"
|
||||
testpb "google.golang.org/grpc/interop/grpc_testing"
|
||||
"google.golang.org/protobuf/types/known/structpb"
|
||||
)
|
||||
|
||||
const serviceNameKey = "service_name"
|
||||
const serviceNamespaceKey = "service_namespace"
|
||||
const serviceNameValue = "grpc-service"
|
||||
const serviceNamespaceValue = "grpc-service-namespace"
|
||||
|
||||
// TestTelemetryLabels tests that telemetry labels from CDS make their way to
|
||||
// the stats handler. The stats handler sets the mutable context value that the
|
||||
// cluster impl picker will write telemetry labels to, and then the stats
|
||||
// handler asserts that subsequent HandleRPC calls from the RPC lifecycle
|
||||
// contain telemetry labels that it can see.
|
||||
func (s) TestTelemetryLabels(t *testing.T) {
|
||||
managementServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
|
||||
defer cleanup()
|
||||
|
||||
server := stubserver.StartTestService(t, nil)
|
||||
defer server.Stop()
|
||||
|
||||
const xdsServiceName = "my-service-client-side-xds"
|
||||
resources := e2e.DefaultClientResources(e2e.ResourceParams{
|
||||
DialTarget: xdsServiceName,
|
||||
NodeID: nodeID,
|
||||
Host: "localhost",
|
||||
Port: testutils.ParsePort(t, server.Address),
|
||||
SecLevel: e2e.SecurityLevelNone,
|
||||
})
|
||||
|
||||
resources.Clusters[0].Metadata = &v3corepb.Metadata{
|
||||
FilterMetadata: map[string]*structpb.Struct{
|
||||
"com.google.csm.telemetry_labels": {
|
||||
Fields: map[string]*structpb.Value{
|
||||
serviceNameKey: structpb.NewStringValue(serviceNameValue),
|
||||
serviceNamespaceKey: structpb.NewStringValue(serviceNamespaceValue),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
if err := managementServer.Update(ctx, resources); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fsh := &fakeStatsHandler{
|
||||
t: t,
|
||||
}
|
||||
|
||||
cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", xdsServiceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver), grpc.WithStatsHandler(fsh))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create a new client to local test server: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
client := testgrpc.NewTestServiceClient(cc)
|
||||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("rpc EmptyCall() failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
type fakeStatsHandler struct {
|
||||
labels *istats.Labels
|
||||
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (fsh *fakeStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (fsh *fakeStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
|
||||
|
||||
func (fsh *fakeStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
|
||||
labels := &istats.Labels{
|
||||
TelemetryLabels: make(map[string]string),
|
||||
}
|
||||
fsh.labels = labels
|
||||
ctx = istats.SetLabels(ctx, labels) // ctx passed is immutable, however cluster_impl writes to the map of Telemetry Labels on the heap.
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
|
||||
switch rs.(type) {
|
||||
// stats.Begin won't get Telemetry Labels because happens after picker
|
||||
// picks.
|
||||
|
||||
// These three stats callouts trigger all metrics for OpenTelemetry that
|
||||
// aren't started. All of these should have access to the desired telemetry
|
||||
// labels.
|
||||
case *stats.OutPayload, *stats.InPayload, *stats.End:
|
||||
if label, ok := fsh.labels.TelemetryLabels[serviceNameKey]; !ok || label != serviceNameValue {
|
||||
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKey, serviceNameValue, label)
|
||||
}
|
||||
if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKey]; !ok || label != serviceNamespaceValue {
|
||||
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKey, serviceNamespaceValue, label)
|
||||
}
|
||||
|
||||
default:
|
||||
// Nothing to assert for the other stats.Handler callouts.
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -645,6 +645,8 @@ func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []cluste
|
|||
}
|
||||
dm.OutlierDetection = odJSON
|
||||
|
||||
dm.TelemetryLabels = cluster.TelemetryLabels
|
||||
|
||||
return append(dms, dm), true, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -123,6 +123,7 @@ type clusterImplBalancer struct {
|
|||
requestCounterService string // The service name for the request counter.
|
||||
requestCounter *xdsclient.ClusterRequestsCounter
|
||||
requestCountMax uint32
|
||||
telemetryLabels map[string]string
|
||||
pickerUpdateCh *buffer.Unbounded
|
||||
}
|
||||
|
||||
|
|
@ -465,18 +466,19 @@ func (b *clusterImplBalancer) run() {
|
|||
b.childState = u
|
||||
b.ClientConn.UpdateState(balancer.State{
|
||||
ConnectivityState: b.childState.ConnectivityState,
|
||||
Picker: newPicker(b.childState, &dropConfigs{
|
||||
Picker: b.newPicker(&dropConfigs{
|
||||
drops: b.drops,
|
||||
requestCounter: b.requestCounter,
|
||||
requestCountMax: b.requestCountMax,
|
||||
}, b.loadWrapper),
|
||||
}),
|
||||
})
|
||||
case *LBConfig:
|
||||
b.telemetryLabels = u.TelemetryLabels
|
||||
dc := b.handleDropAndRequestCount(u)
|
||||
if dc != nil && b.childState.Picker != nil {
|
||||
b.ClientConn.UpdateState(balancer.State{
|
||||
ConnectivityState: b.childState.ConnectivityState,
|
||||
Picker: newPicker(b.childState, dc, b.loadWrapper),
|
||||
Picker: b.newPicker(dc),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,10 +40,12 @@ type LBConfig struct {
|
|||
EDSServiceName string `json:"edsServiceName,omitempty"`
|
||||
// LoadReportingServer is the LRS server to send load reports to. If not
|
||||
// present, load reporting will be disabled.
|
||||
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
|
||||
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
|
||||
DropCategories []DropConfig `json:"dropCategories,omitempty"`
|
||||
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
|
||||
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
|
||||
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
|
||||
DropCategories []DropConfig `json:"dropCategories,omitempty"`
|
||||
// TelemetryLabels are the telemetry Labels associated with this cluster.
|
||||
TelemetryLabels map[string]string `json:"telemetryLabels,omitempty"`
|
||||
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
|
||||
}
|
||||
|
||||
func parseConfig(c json.RawMessage) (*LBConfig, error) {
|
||||
|
|
|
|||
|
|
@ -19,15 +19,14 @@
|
|||
package clusterimpl
|
||||
|
||||
import (
|
||||
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/internal/stats"
|
||||
"google.golang.org/grpc/internal/wrr"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/load"
|
||||
|
||||
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
|
||||
)
|
||||
|
||||
// NewRandomWRR is used when calculating drops. It's exported so that tests can
|
||||
|
|
@ -78,24 +77,36 @@ type loadReporter interface {
|
|||
|
||||
// Picker implements RPC drop, circuit breaking drop and load reporting.
|
||||
type picker struct {
|
||||
drops []*dropper
|
||||
s balancer.State
|
||||
loadStore loadReporter
|
||||
counter *xdsclient.ClusterRequestsCounter
|
||||
countMax uint32
|
||||
drops []*dropper
|
||||
s balancer.State
|
||||
loadStore loadReporter
|
||||
counter *xdsclient.ClusterRequestsCounter
|
||||
countMax uint32
|
||||
telemetryLabels map[string]string
|
||||
}
|
||||
|
||||
func newPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter) *picker {
|
||||
func (b *clusterImplBalancer) newPicker(config *dropConfigs) *picker {
|
||||
return &picker{
|
||||
drops: config.drops,
|
||||
s: s,
|
||||
loadStore: loadStore,
|
||||
counter: config.requestCounter,
|
||||
countMax: config.requestCountMax,
|
||||
drops: config.drops,
|
||||
s: b.childState,
|
||||
loadStore: b.loadWrapper,
|
||||
counter: config.requestCounter,
|
||||
countMax: config.requestCountMax,
|
||||
telemetryLabels: b.telemetryLabels,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
|
||||
// Unconditionally set labels if present, even dropped or queued RPC's can
|
||||
// use these labels.
|
||||
if info.Ctx != nil {
|
||||
if labels := stats.GetLabels(info.Ctx); labels != nil && labels.TelemetryLabels != nil {
|
||||
for key, value := range d.telemetryLabels {
|
||||
labels.TelemetryLabels[key] = value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Don't drop unless the inner picker is READY. Similar to
|
||||
// https://github.com/grpc/grpc-go/issues/2622.
|
||||
if d.s.ConnectivityState == connectivity.Ready {
|
||||
|
|
|
|||
|
|
@ -103,6 +103,8 @@ type DiscoveryMechanism struct {
|
|||
// OutlierDetection is the Outlier Detection LB configuration for this
|
||||
// priority.
|
||||
OutlierDetection json.RawMessage `json:"outlierDetection,omitempty"`
|
||||
// TelemetryLabels are the telemetry labels associated with this cluster.
|
||||
TelemetryLabels map[string]string `json:"telemetryLabels,omitempty"`
|
||||
outlierDetection outlierdetection.LBConfig
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -146,8 +146,9 @@ func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism
|
|||
retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
|
||||
}
|
||||
return pName, &clusterimpl.LBConfig{
|
||||
Cluster: mechanism.Cluster,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
|
||||
Cluster: mechanism.Cluster,
|
||||
TelemetryLabels: mechanism.TelemetryLabels,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
|
||||
}, retAddrs
|
||||
}
|
||||
|
||||
|
|
@ -283,6 +284,7 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority
|
|||
EDSServiceName: mechanism.EDSServiceName,
|
||||
LoadReportingServer: mechanism.LoadReportingServer,
|
||||
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
|
||||
TelemetryLabels: mechanism.TelemetryLabels,
|
||||
DropCategories: drops,
|
||||
ChildPolicy: xdsLBPolicy,
|
||||
}, addrs, nil
|
||||
|
|
|
|||
Loading…
Reference in New Issue