diff --git a/xds/internal/clients/config.go b/xds/internal/clients/config.go index 0ededfae7..f106465f6 100644 --- a/xds/internal/clients/config.go +++ b/xds/internal/clients/config.go @@ -100,3 +100,13 @@ type Locality struct { // SubZone is the further subdivision within a zone. SubZone string } + +// MetricsReporter is used by the XDSClient to report metrics. +type MetricsReporter interface { + // ReportMetric reports a metric. The metric will be one of the predefined + // set of types depending on the client (XDSClient or LRSClient). + // + // Each client will produce different metrics. Please see the client's + // documentation for a list of possible metrics events. + ReportMetric(metric any) +} diff --git a/xds/internal/clients/xdsclient/authority.go b/xds/internal/clients/xdsclient/authority.go index 3ada1465c..7a3a29691 100644 --- a/xds/internal/clients/xdsclient/authority.go +++ b/xds/internal/clients/xdsclient/authority.go @@ -26,8 +26,10 @@ import ( "google.golang.org/grpc/grpclog" igrpclog "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/xds/internal/clients" "google.golang.org/grpc/xds/internal/clients/internal/syncutil" "google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource" + "google.golang.org/grpc/xds/internal/clients/xdsclient/metrics" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -87,6 +89,7 @@ type authority struct { xdsClientSerializerClose func() // Function to close the above serializer. logger *igrpclog.PrefixLogger // Logger for this authority. target string // The gRPC Channel target. + metricsReporter clients.MetricsReporter // The below defined fields must only be accessed in the context of the // serializer callback, owned by this authority. @@ -121,6 +124,7 @@ type authorityBuildOptions struct { getChannelForADS xdsChannelForADS // Function to acquire a reference to an xdsChannel logPrefix string // Prefix for logging target string // Target for the gRPC Channel that owns xDS Client/Authority + metricsReporter clients.MetricsReporter // Metrics reporter for the authority } // newAuthority creates a new authority instance with the provided @@ -145,6 +149,7 @@ func newAuthority(args authorityBuildOptions) *authority { logger: igrpclog.NewPrefixLogger(l, logPrefix), resources: make(map[ResourceType]map[string]*resourceState), target: args.target, + metricsReporter: args.metricsReporter, } // Create an ordered list of xdsChannels with their server configs. The @@ -363,6 +368,11 @@ func (a *authority) handleADSResourceUpdate(serverConfig *ServerConfig, rType Re // On error, keep previous version of the resource. But update status // and error. if uErr.Err != nil { + if a.metricsReporter != nil { + a.metricsReporter.ReportMetric(&metrics.ResourceUpdateInvalid{ + ServerURI: serverConfig.ServerIdentifier.ServerURI, ResourceType: rType.TypeName, + }) + } state.md.ErrState = md.ErrState state.md.Status = md.Status for watcher := range state.watchers { @@ -378,6 +388,12 @@ func (a *authority) handleADSResourceUpdate(serverConfig *ServerConfig, rType Re continue } + if a.metricsReporter != nil { + a.metricsReporter.ReportMetric(&metrics.ResourceUpdateValid{ + ServerURI: serverConfig.ServerIdentifier.ServerURI, ResourceType: rType.TypeName, + }) + } + if state.deletionIgnored { state.deletionIgnored = false a.logger.Infof("A valid update was received for resource %q of type %q after previously ignoring a deletion", name, rType.TypeName) diff --git a/xds/internal/clients/xdsclient/metrics/metrics.go b/xds/internal/clients/xdsclient/metrics/metrics.go new file mode 100644 index 000000000..2e14efb1a --- /dev/null +++ b/xds/internal/clients/xdsclient/metrics/metrics.go @@ -0,0 +1,42 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package metrics defines all metrics that can be produced by an xDS client. +// All calls to the MetricsRecorder by the xDS client will contain a struct +// from this package passed by pointer. +package metrics + +// ResourceUpdateValid is a metric to report a valid resource update from +// the xDS management server for a given resource type. +type ResourceUpdateValid struct { + ServerURI string + ResourceType string +} + +// ResourceUpdateInvalid is a metric to report an invalid resource update +// from the xDS management server for a given resource type. +type ResourceUpdateInvalid struct { + ServerURI string + ResourceType string +} + +// ServerFailure is a metric to report a server failure of the xDS +// management server. +type ServerFailure struct { + ServerURI string +} diff --git a/xds/internal/clients/xdsclient/test/helpers_test.go b/xds/internal/clients/xdsclient/test/helpers_test.go index d296fee6e..16453c805 100644 --- a/xds/internal/clients/xdsclient/test/helpers_test.go +++ b/xds/internal/clients/xdsclient/test/helpers_test.go @@ -20,6 +20,7 @@ package xdsclient_test import ( "bytes" + "context" "errors" "fmt" "strconv" @@ -29,12 +30,14 @@ import ( "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/xds/internal/clients/internal/pretty" + "google.golang.org/grpc/xds/internal/clients/internal/testutils" "google.golang.org/grpc/xds/internal/clients/xdsclient" "google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource" "google.golang.org/protobuf/proto" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + "github.com/google/go-cmp/cmp" ) type s struct { @@ -262,3 +265,36 @@ func buildResourceName(typeName, auth, id string, ctxParams map[string]string) s ContextParams: ctxParams, }).String() } + +// testMetricsReporter is a MetricsReporter to be used in tests. It sends +// recording events on channels and provides helpers to check if certain events +// have taken place. +type testMetricsReporter struct { + metricsCh *testutils.Channel +} + +// newTestMetricsReporter returns a new testMetricsReporter. +func newTestMetricsReporter() *testMetricsReporter { + return &testMetricsReporter{ + metricsCh: testutils.NewChannelWithSize(1), + } +} + +// waitForMetric waits for a metric to be recorded and verifies that the +// recorded metrics data matches the expected metricsDataWant. Returns +// an error if failed to wait or received wrong data. +func (r *testMetricsReporter) waitForMetric(ctx context.Context, metricsDataWant any) error { + got, err := r.metricsCh.Receive(ctx) + if err != nil { + return fmt.Errorf("timeout waiting for int64Count") + } + if diff := cmp.Diff(got, metricsDataWant); diff != "" { + return fmt.Errorf("received unexpected metrics value (-got, +want): %v", diff) + } + return nil +} + +// ReportMetric sends the metrics data to the metricsCh channel. +func (r *testMetricsReporter) ReportMetric(m any) { + r.metricsCh.Send(m) +} diff --git a/xds/internal/clients/xdsclient/test/metrics_test.go b/xds/internal/clients/xdsclient/test/metrics_test.go new file mode 100644 index 000000000..008573615 --- /dev/null +++ b/xds/internal/clients/xdsclient/test/metrics_test.go @@ -0,0 +1,284 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import ( + "context" + "net" + "testing" + + "github.com/google/uuid" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/xds/internal/clients" + "google.golang.org/grpc/xds/internal/clients/grpctransport" + "google.golang.org/grpc/xds/internal/clients/internal/testutils/e2e" + "google.golang.org/grpc/xds/internal/clients/xdsclient" + "google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource" + "google.golang.org/grpc/xds/internal/clients/xdsclient/metrics" + + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" +) + +// TestResourceUpdateMetrics configures an xDS client, and a management server +// to send valid and invalid LDS updates, and verifies that the expected metrics +// for both good and bad updates are emitted. +func (s) TestResourceUpdateMetrics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout*1000) + defer cancel() + + tmr := newTestMetricsReporter() + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: l}) + const listenerResourceName = "test-listener-resource" + const routeConfigurationName = "test-route-configuration-resource" + nodeID := uuid.New().String() + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType} + si := clients.ServerIdentifier{ + ServerURI: mgmtServer.Address, + Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}, + } + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + xdsClientConfig := xdsclient.Config{ + Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}}, + Node: clients.Node{ID: nodeID}, + TransportBuilder: grpctransport.NewBuilder(configs), + ResourceTypes: resourceTypes, + // Xdstp resource names used in this test do not specify an + // authority. These will end up looking up an entry with the + // empty key in the authorities map. Having an entry with an + // empty key and empty configuration, results in these + // resources also using the top-level configuration. + Authorities: map[string]xdsclient.Authority{ + "": {XDSServers: []xdsclient.ServerConfig{}}, + }, + MetricsReporter: tmr, + } + // Create an xDS client with the above config. + client, err := xdsclient.New(xdsClientConfig) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Watch the valid listener configured on the management server. This should + // cause a resource update valid metric to emit eventually. + client.WatchResource(listenerType.TypeURL, listenerResourceName, noopListenerWatcher{}) + if err := tmr.waitForMetric(ctx, &metrics.ResourceUpdateValid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil { + t.Fatal(err.Error()) + } + + // Update management server with a bad update. This should cause a resource + // update invalid metric to emit eventually. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)}, + SkipValidation: true, + } + resources.Listeners[0].ApiListener = nil + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + if err := tmr.waitForMetric(ctx, &metrics.ResourceUpdateInvalid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil { + t.Fatal(err.Error()) + } + + // Resource update valid metric should have not emitted. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if err := tmr.waitForMetric(sCtx, &metrics.ResourceUpdateValid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err == nil { + t.Fatal("tmr.WaitForInt64Count(ctx, mdWant) succeeded when expected to timeout.") + } +} + +// TestServerFailureMetrics_BeforeResponseRecv configures an xDS client, and a +// management server. It then register a watcher and stops the management +// server before sending a resource update, and verifies that the expected +// metric for server failure is emitted. +func (s) TestServerFailureMetrics_BeforeResponseRecv(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + tmr := newTestMetricsReporter() + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + + lis := testutils.NewRestartableListener(l) + streamOpened := make(chan struct{}, 1) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, + OnStreamOpen: func(context.Context, int64, string) error { + select { + case streamOpened <- struct{}{}: + default: + } + return nil + }, + }) + + nodeID := uuid.New().String() + + resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType} + si := clients.ServerIdentifier{ + ServerURI: mgmtServer.Address, + Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}, + } + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + xdsClientConfig := xdsclient.Config{ + Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}}, + Node: clients.Node{ID: nodeID}, + TransportBuilder: grpctransport.NewBuilder(configs), + ResourceTypes: resourceTypes, + // Xdstp resource names used in this test do not specify an + // authority. These will end up looking up an entry with the + // empty key in the authorities map. Having an entry with an + // empty key and empty configuration, results in these + // resources also using the top-level configuration. + Authorities: map[string]xdsclient.Authority{ + "": {XDSServers: []xdsclient.ServerConfig{}}, + }, + MetricsReporter: tmr, + } + // Create an xDS client with the above config. + client, err := xdsclient.New(xdsClientConfig) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + const listenerResourceName = "test-listener-resource" + + // Watch for the listener on the above management server. + client.WatchResource(listenerType.TypeURL, listenerResourceName, noopListenerWatcher{}) + // Verify that an ADS stream is opened and an LDS request with the above + // resource name is sent. + select { + case <-streamOpened: + case <-ctx.Done(): + t.Fatal("Timeout when waiting for ADS stream to open") + } + + // Close the listener and ensure that the ADS stream breaks. This should + // cause a server failure metric to emit eventually. + lis.Stop() + + // Restart to prevent the attempt to create a new ADS stream after back off. + lis.Restart() + + if err := tmr.waitForMetric(ctx, &metrics.ServerFailure{ServerURI: mgmtServer.Address}); err != nil { + t.Fatal(err.Error()) + } +} + +// TestServerFailureMetrics_AfterResponseRecv configures an xDS client, and a +// management server to send a valid LDS updates, and verifies that the +// resource update valid metric is emitted. It then closes the management server +// listener to close the ADS stream and verifies that the server failure metric +// is not emitted because the ADS stream was closed after having received a +// response on the stream. +func (s) TestServerFailureMetrics_AfterResponseRecv(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + tmr := newTestMetricsReporter() + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + + lis := testutils.NewRestartableListener(l) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: lis}) + const listenerResourceName = "test-listener-resource" + const routeConfigurationName = "test-route-configuration-resource" + nodeID := uuid.New().String() + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType} + si := clients.ServerIdentifier{ + ServerURI: mgmtServer.Address, + Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}, + } + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + xdsClientConfig := xdsclient.Config{ + Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}}, + Node: clients.Node{ID: nodeID}, + TransportBuilder: grpctransport.NewBuilder(configs), + ResourceTypes: resourceTypes, + // Xdstp resource names used in this test do not specify an + // authority. These will end up looking up an entry with the + // empty key in the authorities map. Having an entry with an + // empty key and empty configuration, results in these + // resources also using the top-level configuration. + Authorities: map[string]xdsclient.Authority{ + "": {XDSServers: []xdsclient.ServerConfig{}}, + }, + MetricsReporter: tmr, + } + // Create an xDS client with the above config. + client, err := xdsclient.New(xdsClientConfig) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Watch the valid listener configured on the management server. This should + // cause a resource updates valid count to emit eventually. + client.WatchResource(listenerType.TypeURL, listenerResourceName, noopListenerWatcher{}) + if err := tmr.waitForMetric(ctx, &metrics.ResourceUpdateValid{ServerURI: mgmtServer.Address, ResourceType: "ListenerResource"}); err != nil { + t.Fatal(err.Error()) + } + + // Close the listener and ensure that the ADS stream breaks. This should + // cause a server failure metric to emit eventually. + lis.Stop() + if ctx.Err() != nil { + t.Fatalf("Timeout when waiting for ADS stream to close") + } + // Restart to prevent the attempt to create a new ADS stream after back off. + lis.Restart() + + // Server failure should not have emitted. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if err := tmr.waitForMetric(sCtx, &metrics.ServerFailure{ServerURI: mgmtServer.Address}); err == nil { + t.Fatal("tmr.WaitForInt64Count(ctx, mdWant) succeeded when expected to timeout.") + } +} diff --git a/xds/internal/clients/xdsclient/xdsclient.go b/xds/internal/clients/xdsclient/xdsclient.go index f0f4a6784..415a2a1e1 100644 --- a/xds/internal/clients/xdsclient/xdsclient.go +++ b/xds/internal/clients/xdsclient/xdsclient.go @@ -44,6 +44,7 @@ import ( "google.golang.org/grpc/xds/internal/clients/internal/backoff" "google.golang.org/grpc/xds/internal/clients/internal/syncutil" "google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource" + "google.golang.org/grpc/xds/internal/clients/xdsclient/metrics" "google.golang.org/protobuf/proto" v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" @@ -82,8 +83,9 @@ type XDSClient struct { resourceTypes map[string]ResourceType // Registry of resource types, for parsing incoming ADS responses. serializer *syncutil.CallbackSerializer // Serializer for invoking resource watcher callbacks. serializerClose func() // Function to close the serializer. - logger *grpclog.PrefixLogger // Logger for this client. - target string // The gRPC target for this client. + logger *grpclog.PrefixLogger + target string + metricsReporter clients.MetricsReporter // The XDSClient owns a bunch of channels to individual xDS servers // specified in the xDS client configuration. Authorities acquire references @@ -138,6 +140,7 @@ func newClient(config *Config, watchExpiryTimeout time.Duration, streamBackoff f transportBuilder: config.TransportBuilder, resourceTypes: config.ResourceTypes, xdsActiveChannels: make(map[ServerConfig]*channelState), + metricsReporter: config.MetricsReporter, } for name, cfg := range config.Authorities { @@ -154,6 +157,7 @@ func newClient(config *Config, watchExpiryTimeout time.Duration, streamBackoff f getChannelForADS: c.getChannelForADS, logPrefix: clientPrefix(c), target: target, + metricsReporter: c.metricsReporter, }) } c.topLevelAuthority = newAuthority(authorityBuildOptions{ @@ -163,8 +167,10 @@ func newClient(config *Config, watchExpiryTimeout time.Duration, streamBackoff f getChannelForADS: c.getChannelForADS, logPrefix: clientPrefix(c), target: target, + metricsReporter: c.metricsReporter, }) c.logger = prefixLogger(c) + return c, nil } @@ -384,6 +390,12 @@ func (cs *channelState) adsStreamFailure(err error) { return } + if xdsresource.ErrType(err) != xdsresource.ErrTypeStreamFailedAfterRecv && cs.parent.metricsReporter != nil { + cs.parent.metricsReporter.ReportMetric(&metrics.ServerFailure{ + ServerURI: cs.serverConfig.ServerIdentifier.ServerURI, + }) + } + cs.parent.channelsMu.Lock() defer cs.parent.channelsMu.Unlock() for authority := range cs.interestedAuthorities { diff --git a/xds/internal/clients/xdsclient/xdsconfig.go b/xds/internal/clients/xdsclient/xdsconfig.go index 4c7d090df..a276309df 100644 --- a/xds/internal/clients/xdsclient/xdsconfig.go +++ b/xds/internal/clients/xdsclient/xdsconfig.go @@ -56,6 +56,10 @@ type Config struct { // // For example: "type.googleapis.com/envoy.config.listener.v3.Listener" ResourceTypes map[string]ResourceType + + // MetricsReporter is used to report registered metrics. If unset, no + // metrics will be reported. + MetricsReporter clients.MetricsReporter } // ServerConfig contains configuration for an xDS management server.