xds: add MetricsReporter for generic xds client (#8274)

This commit is contained in:
Purnesh Dixit 2025-05-08 22:48:00 +05:30 committed by GitHub
parent d00f4acc38
commit b3d63b180c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 406 additions and 2 deletions

View File

@ -100,3 +100,13 @@ type Locality struct {
// SubZone is the further subdivision within a zone.
SubZone string
}
// MetricsReporter is used by the XDSClient to report metrics.
type MetricsReporter interface {
// ReportMetric reports a metric. The metric will be one of the predefined
// set of types depending on the client (XDSClient or LRSClient).
//
// Each client will produce different metrics. Please see the client's
// documentation for a list of possible metrics events.
ReportMetric(metric any)
}

View File

@ -26,8 +26,10 @@ import (
"google.golang.org/grpc/grpclog"
igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/clients"
"google.golang.org/grpc/xds/internal/clients/internal/syncutil"
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
"google.golang.org/grpc/xds/internal/clients/xdsclient/metrics"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
@ -87,6 +89,7 @@ type authority struct {
xdsClientSerializerClose func() // Function to close the above serializer.
logger *igrpclog.PrefixLogger // Logger for this authority.
target string // The gRPC Channel target.
metricsReporter clients.MetricsReporter
// The below defined fields must only be accessed in the context of the
// serializer callback, owned by this authority.
@ -121,6 +124,7 @@ type authorityBuildOptions struct {
getChannelForADS xdsChannelForADS // Function to acquire a reference to an xdsChannel
logPrefix string // Prefix for logging
target string // Target for the gRPC Channel that owns xDS Client/Authority
metricsReporter clients.MetricsReporter // Metrics reporter for the authority
}
// newAuthority creates a new authority instance with the provided
@ -145,6 +149,7 @@ func newAuthority(args authorityBuildOptions) *authority {
logger: igrpclog.NewPrefixLogger(l, logPrefix),
resources: make(map[ResourceType]map[string]*resourceState),
target: args.target,
metricsReporter: args.metricsReporter,
}
// Create an ordered list of xdsChannels with their server configs. The
@ -363,6 +368,11 @@ func (a *authority) handleADSResourceUpdate(serverConfig *ServerConfig, rType Re
// On error, keep previous version of the resource. But update status
// and error.
if uErr.Err != nil {
if a.metricsReporter != nil {
a.metricsReporter.ReportMetric(&metrics.ResourceUpdateInvalid{
ServerURI: serverConfig.ServerIdentifier.ServerURI, ResourceType: rType.TypeName,
})
}
state.md.ErrState = md.ErrState
state.md.Status = md.Status
for watcher := range state.watchers {
@ -378,6 +388,12 @@ func (a *authority) handleADSResourceUpdate(serverConfig *ServerConfig, rType Re
continue
}
if a.metricsReporter != nil {
a.metricsReporter.ReportMetric(&metrics.ResourceUpdateValid{
ServerURI: serverConfig.ServerIdentifier.ServerURI, ResourceType: rType.TypeName,
})
}
if state.deletionIgnored {
state.deletionIgnored = false
a.logger.Infof("A valid update was received for resource %q of type %q after previously ignoring a deletion", name, rType.TypeName)

View File

@ -0,0 +1,42 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package metrics defines all metrics that can be produced by an xDS client.
// All calls to the MetricsRecorder by the xDS client will contain a struct
// from this package passed by pointer.
package metrics
// ResourceUpdateValid is a metric to report a valid resource update from
// the xDS management server for a given resource type.
type ResourceUpdateValid struct {
ServerURI string
ResourceType string
}
// ResourceUpdateInvalid is a metric to report an invalid resource update
// from the xDS management server for a given resource type.
type ResourceUpdateInvalid struct {
ServerURI string
ResourceType string
}
// ServerFailure is a metric to report a server failure of the xDS
// management server.
type ServerFailure struct {
ServerURI string
}

View File

@ -20,6 +20,7 @@ package xdsclient_test
import (
"bytes"
"context"
"errors"
"fmt"
"strconv"
@ -29,12 +30,14 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/xds/internal/clients/internal/pretty"
"google.golang.org/grpc/xds/internal/clients/internal/testutils"
"google.golang.org/grpc/xds/internal/clients/xdsclient"
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
"google.golang.org/protobuf/proto"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
"github.com/google/go-cmp/cmp"
)
type s struct {
@ -262,3 +265,36 @@ func buildResourceName(typeName, auth, id string, ctxParams map[string]string) s
ContextParams: ctxParams,
}).String()
}
// testMetricsReporter is a MetricsReporter to be used in tests. It sends
// recording events on channels and provides helpers to check if certain events
// have taken place.
type testMetricsReporter struct {
metricsCh *testutils.Channel
}
// newTestMetricsReporter returns a new testMetricsReporter.
func newTestMetricsReporter() *testMetricsReporter {
return &testMetricsReporter{
metricsCh: testutils.NewChannelWithSize(1),
}
}
// waitForMetric waits for a metric to be recorded and verifies that the
// recorded metrics data matches the expected metricsDataWant. Returns
// an error if failed to wait or received wrong data.
func (r *testMetricsReporter) waitForMetric(ctx context.Context, metricsDataWant any) error {
got, err := r.metricsCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout waiting for int64Count")
}
if diff := cmp.Diff(got, metricsDataWant); diff != "" {
return fmt.Errorf("received unexpected metrics value (-got, +want): %v", diff)
}
return nil
}
// ReportMetric sends the metrics data to the metricsCh channel.
func (r *testMetricsReporter) ReportMetric(m any) {
r.metricsCh.Send(m)
}

View File

@ -0,0 +1,284 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsclient_test
import (
"context"
"net"
"testing"
"github.com/google/uuid"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/xds/internal/clients"
"google.golang.org/grpc/xds/internal/clients/grpctransport"
"google.golang.org/grpc/xds/internal/clients/internal/testutils/e2e"
"google.golang.org/grpc/xds/internal/clients/xdsclient"
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
"google.golang.org/grpc/xds/internal/clients/xdsclient/metrics"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
)
// TestResourceUpdateMetrics configures an xDS client, and a management server
// to send valid and invalid LDS updates, and verifies that the expected metrics
// for both good and bad updates are emitted.
func (s) TestResourceUpdateMetrics(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout*1000)
defer cancel()
tmr := newTestMetricsReporter()
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: l})
const listenerResourceName = "test-listener-resource"
const routeConfigurationName = "test-route-configuration-resource"
nodeID := uuid.New().String()
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType}
si := clients.ServerIdentifier{
ServerURI: mgmtServer.Address,
Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"},
}
configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}}
xdsClientConfig := xdsclient.Config{
Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}},
Node: clients.Node{ID: nodeID},
TransportBuilder: grpctransport.NewBuilder(configs),
ResourceTypes: resourceTypes,
// Xdstp resource names used in this test do not specify an
// authority. These will end up looking up an entry with the
// empty key in the authorities map. Having an entry with an
// empty key and empty configuration, results in these
// resources also using the top-level configuration.
Authorities: map[string]xdsclient.Authority{
"": {XDSServers: []xdsclient.ServerConfig{}},
},
MetricsReporter: tmr,
}
// Create an xDS client with the above config.
client, err := xdsclient.New(xdsClientConfig)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
// Watch the valid listener configured on the management server. This should
// cause a resource update valid metric to emit eventually.
client.WatchResource(listenerType.TypeURL, listenerResourceName, noopListenerWatcher{})
if err := tmr.waitForMetric(ctx, &metrics.ResourceUpdateValid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil {
t.Fatal(err.Error())
}
// Update management server with a bad update. This should cause a resource
// update invalid metric to emit eventually.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)},
SkipValidation: true,
}
resources.Listeners[0].ApiListener = nil
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
if err := tmr.waitForMetric(ctx, &metrics.ResourceUpdateInvalid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil {
t.Fatal(err.Error())
}
// Resource update valid metric should have not emitted.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if err := tmr.waitForMetric(sCtx, &metrics.ResourceUpdateValid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err == nil {
t.Fatal("tmr.WaitForInt64Count(ctx, mdWant) succeeded when expected to timeout.")
}
}
// TestServerFailureMetrics_BeforeResponseRecv configures an xDS client, and a
// management server. It then register a watcher and stops the management
// server before sending a resource update, and verifies that the expected
// metric for server failure is emitted.
func (s) TestServerFailureMetrics_BeforeResponseRecv(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
tmr := newTestMetricsReporter()
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)
streamOpened := make(chan struct{}, 1)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
OnStreamOpen: func(context.Context, int64, string) error {
select {
case streamOpened <- struct{}{}:
default:
}
return nil
},
})
nodeID := uuid.New().String()
resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType}
si := clients.ServerIdentifier{
ServerURI: mgmtServer.Address,
Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"},
}
configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}}
xdsClientConfig := xdsclient.Config{
Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}},
Node: clients.Node{ID: nodeID},
TransportBuilder: grpctransport.NewBuilder(configs),
ResourceTypes: resourceTypes,
// Xdstp resource names used in this test do not specify an
// authority. These will end up looking up an entry with the
// empty key in the authorities map. Having an entry with an
// empty key and empty configuration, results in these
// resources also using the top-level configuration.
Authorities: map[string]xdsclient.Authority{
"": {XDSServers: []xdsclient.ServerConfig{}},
},
MetricsReporter: tmr,
}
// Create an xDS client with the above config.
client, err := xdsclient.New(xdsClientConfig)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
const listenerResourceName = "test-listener-resource"
// Watch for the listener on the above management server.
client.WatchResource(listenerType.TypeURL, listenerResourceName, noopListenerWatcher{})
// Verify that an ADS stream is opened and an LDS request with the above
// resource name is sent.
select {
case <-streamOpened:
case <-ctx.Done():
t.Fatal("Timeout when waiting for ADS stream to open")
}
// Close the listener and ensure that the ADS stream breaks. This should
// cause a server failure metric to emit eventually.
lis.Stop()
// Restart to prevent the attempt to create a new ADS stream after back off.
lis.Restart()
if err := tmr.waitForMetric(ctx, &metrics.ServerFailure{ServerURI: mgmtServer.Address}); err != nil {
t.Fatal(err.Error())
}
}
// TestServerFailureMetrics_AfterResponseRecv configures an xDS client, and a
// management server to send a valid LDS updates, and verifies that the
// resource update valid metric is emitted. It then closes the management server
// listener to close the ADS stream and verifies that the server failure metric
// is not emitted because the ADS stream was closed after having received a
// response on the stream.
func (s) TestServerFailureMetrics_AfterResponseRecv(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
tmr := newTestMetricsReporter()
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: lis})
const listenerResourceName = "test-listener-resource"
const routeConfigurationName = "test-route-configuration-resource"
nodeID := uuid.New().String()
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType}
si := clients.ServerIdentifier{
ServerURI: mgmtServer.Address,
Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"},
}
configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}}
xdsClientConfig := xdsclient.Config{
Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}},
Node: clients.Node{ID: nodeID},
TransportBuilder: grpctransport.NewBuilder(configs),
ResourceTypes: resourceTypes,
// Xdstp resource names used in this test do not specify an
// authority. These will end up looking up an entry with the
// empty key in the authorities map. Having an entry with an
// empty key and empty configuration, results in these
// resources also using the top-level configuration.
Authorities: map[string]xdsclient.Authority{
"": {XDSServers: []xdsclient.ServerConfig{}},
},
MetricsReporter: tmr,
}
// Create an xDS client with the above config.
client, err := xdsclient.New(xdsClientConfig)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
// Watch the valid listener configured on the management server. This should
// cause a resource updates valid count to emit eventually.
client.WatchResource(listenerType.TypeURL, listenerResourceName, noopListenerWatcher{})
if err := tmr.waitForMetric(ctx, &metrics.ResourceUpdateValid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil {
t.Fatal(err.Error())
}
// Close the listener and ensure that the ADS stream breaks. This should
// cause a server failure metric to emit eventually.
lis.Stop()
if ctx.Err() != nil {
t.Fatalf("Timeout when waiting for ADS stream to close")
}
// Restart to prevent the attempt to create a new ADS stream after back off.
lis.Restart()
// Server failure should not have emitted.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if err := tmr.waitForMetric(sCtx, &metrics.ServerFailure{ServerURI: mgmtServer.Address}); err == nil {
t.Fatal("tmr.WaitForInt64Count(ctx, mdWant) succeeded when expected to timeout.")
}
}

View File

@ -44,6 +44,7 @@ import (
"google.golang.org/grpc/xds/internal/clients/internal/backoff"
"google.golang.org/grpc/xds/internal/clients/internal/syncutil"
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
"google.golang.org/grpc/xds/internal/clients/xdsclient/metrics"
"google.golang.org/protobuf/proto"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
@ -82,8 +83,9 @@ type XDSClient struct {
resourceTypes map[string]ResourceType // Registry of resource types, for parsing incoming ADS responses.
serializer *syncutil.CallbackSerializer // Serializer for invoking resource watcher callbacks.
serializerClose func() // Function to close the serializer.
logger *grpclog.PrefixLogger // Logger for this client.
target string // The gRPC target for this client.
logger *grpclog.PrefixLogger
target string
metricsReporter clients.MetricsReporter
// The XDSClient owns a bunch of channels to individual xDS servers
// specified in the xDS client configuration. Authorities acquire references
@ -138,6 +140,7 @@ func newClient(config *Config, watchExpiryTimeout time.Duration, streamBackoff f
transportBuilder: config.TransportBuilder,
resourceTypes: config.ResourceTypes,
xdsActiveChannels: make(map[ServerConfig]*channelState),
metricsReporter: config.MetricsReporter,
}
for name, cfg := range config.Authorities {
@ -154,6 +157,7 @@ func newClient(config *Config, watchExpiryTimeout time.Duration, streamBackoff f
getChannelForADS: c.getChannelForADS,
logPrefix: clientPrefix(c),
target: target,
metricsReporter: c.metricsReporter,
})
}
c.topLevelAuthority = newAuthority(authorityBuildOptions{
@ -163,8 +167,10 @@ func newClient(config *Config, watchExpiryTimeout time.Duration, streamBackoff f
getChannelForADS: c.getChannelForADS,
logPrefix: clientPrefix(c),
target: target,
metricsReporter: c.metricsReporter,
})
c.logger = prefixLogger(c)
return c, nil
}
@ -384,6 +390,12 @@ func (cs *channelState) adsStreamFailure(err error) {
return
}
if xdsresource.ErrType(err) != xdsresource.ErrTypeStreamFailedAfterRecv && cs.parent.metricsReporter != nil {
cs.parent.metricsReporter.ReportMetric(&metrics.ServerFailure{
ServerURI: cs.serverConfig.ServerIdentifier.ServerURI,
})
}
cs.parent.channelsMu.Lock()
defer cs.parent.channelsMu.Unlock()
for authority := range cs.interestedAuthorities {

View File

@ -56,6 +56,10 @@ type Config struct {
//
// For example: "type.googleapis.com/envoy.config.listener.v3.Listener"
ResourceTypes map[string]ResourceType
// MetricsReporter is used to report registered metrics. If unset, no
// metrics will be reported.
MetricsReporter clients.MetricsReporter
}
// ServerConfig contains configuration for an xDS management server.