Compare commits

...

7 Commits

29 changed files with 1971 additions and 457 deletions

View File

@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
@ -72,6 +73,8 @@ var (
// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
// service config.
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
// PickFirstBalancerName is the name of the pick_first balancer.
PickFirstBalancerName = pickfirst.Name
)
// The following errors are returned from Dial and DialContext

View File

@ -0,0 +1,38 @@
# CSM Observability
This examples shows how to configure CSM Observability for gRPC client and
server applications (configured once per binary), and shows what type of
telemetry data it can produce for certain RPCs with additional CSM Labels. The
gRPC Client accepts configuration from an xDS Control plane as the default
address that it connects to is "xds:///helloworld:50051", but this can be
overridden with the command line flag --server_addr. This can be plugged into
the steps outlined in the CSM Observability User Guide.
## Try it (locally if overwritten xDS Address)
```
go run server/main.go
```
```
go run client/main.go
```
Curl to the port where Prometheus exporter is outputting metrics data:
```
curl localhost:9464/metrics
```
# Building
From the grpc-go directory:
Client:
docker build -t <TAG> -f examples/features/csm_observability/client/Dockerfile .
Server:
docker build -t <TAG> -f examples/features/csm_observability/server/Dockerfile .
Note that this example will not work by default, as the client uses an xDS
Scheme and thus needs xDS Resources to connect to the server. Deploy the built
client and server containers within Cloud Service Mesh in order for this example
to work, or overwrite target to point to :<server serving port>.

View File

@ -0,0 +1,35 @@
# Copyright 2024 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Dockerfile for building the example client. To build the image, run the
# following command from grpc-go directory:
# docker build -t <TAG> -f examples/features/csm_observability/client/Dockerfile .
FROM golang:1.21-alpine as build
RUN apk --no-cache add curl
# Make a grpc-go directory and copy the repo into it.
WORKDIR /go/src/grpc-go
COPY . .
# Build a static binary without cgo so that we can copy just the binary in the
# final image, and can get rid of the Go compiler and gRPC-Go dependencies.
RUN cd examples/features/csm_observability/client && go build -tags osusergo,netgo .
FROM alpine
RUN apk --no-cache add curl
COPY --from=build /go/src/grpc-go/examples/features/csm_observability/client/client .
ENV GRPC_GO_LOG_VERBOSITY_LEVEL=99
ENV GRPC_GO_LOG_SEVERITY_LEVEL="info"
ENTRYPOINT ["./client"]

View File

@ -0,0 +1,77 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package main
import (
"context"
"flag"
"fmt"
"log"
"net/http"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/stats/opentelemetry"
"google.golang.org/grpc/stats/opentelemetry/csm"
_ "google.golang.org/grpc/xds" // To install the xds resolvers and balancers.
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
)
var (
target = flag.String("target", "xds:///helloworld:50051", "the server address to connect to")
prometheusEndpoint = flag.String("prometheus_endpoint", ":9464", "the Prometheus exporter endpoint")
)
func main() {
flag.Parse()
exporter, err := prometheus.New()
if err != nil {
log.Fatalf("Failed to start prometheus exporter: %v", err)
}
provider := metric.NewMeterProvider(metric.WithReader(exporter))
go http.ListenAndServe(*prometheusEndpoint, promhttp.Handler())
cleanup := csm.EnableObservability(context.Background(), opentelemetry.Options{MetricsOptions: opentelemetry.MetricsOptions{MeterProvider: provider}})
defer cleanup()
cc, err := grpc.NewClient(*target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to start NewClient: %v", err)
}
defer cc.Close()
c := echo.NewEchoClient(cc)
// Make a RPC every second. This should trigger telemetry to be emitted from
// the client and the server.
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
r, err := c.UnaryEcho(ctx, &echo.EchoRequest{Message: "this is examples/opentelemetry"})
if err != nil {
log.Printf("UnaryEcho failed: %v", err)
}
fmt.Println(r)
time.Sleep(time.Second)
cancel()
}
}

View File

@ -0,0 +1,34 @@
# Copyright 2024 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Dockerfile for building the example server. To build the image, run the
# following command from grpc-go directory:
# docker build -t <TAG> -f examples/features/csm_observability/server/Dockerfile .
FROM golang:1.21-alpine as build
RUN apk --no-cache add curl
# Make a grpc-go directory and copy the repo into it.
WORKDIR /go/src/grpc-go
COPY . .
# Build a static binary without cgo so that we can copy just the binary in the
# final image, and can get rid of the Go compiler and gRPC-Go dependencies.
RUN cd examples/features/csm_observability/server && go build -tags osusergo,netgo .
FROM alpine
RUN apk --no-cache add curl
COPY --from=build /go/src/grpc-go/examples/features/csm_observability/server/server .
ENV GRPC_GO_LOG_VERBOSITY_LEVEL=99
ENV GRPC_GO_LOG_SEVERITY_LEVEL="info"
ENTRYPOINT ["./server"]

View File

@ -0,0 +1,77 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"net/http"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/stats/opentelemetry"
"google.golang.org/grpc/stats/opentelemetry/csm"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
)
var (
port = flag.String("port", "50051", "the server address to connect to")
prometheusEndpoint = flag.String("prometheus_endpoint", ":9464", "the Prometheus exporter endpoint")
)
type echoServer struct {
pb.UnimplementedEchoServer
addr string
}
func (s *echoServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr)}, nil
}
func main() {
flag.Parse()
exporter, err := prometheus.New()
if err != nil {
log.Fatalf("Failed to start prometheus exporter: %v", err)
}
provider := metric.NewMeterProvider(metric.WithReader(exporter))
go http.ListenAndServe(*prometheusEndpoint, promhttp.Handler())
cleanup := csm.EnableObservability(context.Background(), opentelemetry.Options{MetricsOptions: opentelemetry.MetricsOptions{MeterProvider: provider}})
defer cleanup()
lis, err := net.Listen("tcp", ":"+*port)
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterEchoServer(s, &echoServer{addr: ":" + *port})
log.Printf("Serving on %s\n", *port)
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}

View File

@ -4,10 +4,14 @@ go 1.21
require (
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b
github.com/prometheus/client_golang v1.19.1
go.opentelemetry.io/otel/exporters/prometheus v0.49.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
golang.org/x/oauth2 v0.20.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157
google.golang.org/grpc v1.64.0
google.golang.org/grpc/gcp/observability v1.0.1
google.golang.org/grpc/stats/opentelemetry v0.0.0-20240604165302-6d236200ea68
google.golang.org/protobuf v1.34.1
)
@ -22,6 +26,7 @@ require (
cloud.google.com/go/monitoring v1.19.0 // indirect
cloud.google.com/go/trace v1.10.7 // indirect
contrib.go.opencensus.io/exporter/stackdriver v0.13.15-0.20230702191903-2de6d2748484 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.27.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.16 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.16 // indirect
@ -35,6 +40,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.10 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
@ -48,11 +54,16 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.4 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/procfs v0.15.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.27.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
go.opentelemetry.io/otel/sdk v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/net v0.25.0 // indirect
@ -67,3 +78,5 @@ require (
)
replace google.golang.org/grpc => ../
replace google.golang.org/grpc/stats/opentelemetry => ../stats/opentelemetry

View File

@ -774,6 +774,8 @@ gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zum
git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0 h1:yRhWveg9NbJcJYoJL4FoSauT2dxnt4N9MIAJ7tvU/mQ=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0/go.mod h1:p2puVVSKjQ84Qb1gzw2XHLs34WQyHTYFZLaVxypAFYs=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY=
github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk=
@ -811,6 +813,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.28.10 h1:69tpbPED7jKPyzMcrwSvhWcJ9bP
github.com/aws/aws-sdk-go-v2/service/sts v1.28.10/go.mod h1:0Aqn1MnEuitqfsCNyKsdKLhDUOr4txD/g19EfiUqgws=
github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q=
github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@ -1041,10 +1045,18 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+aLCE=
github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U=
github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI5Ek=
github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@ -1089,16 +1101,22 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/detectors/gcp v1.27.0 h1:eVfDeFAPnMFZUhNdDZ/BbpEmC7/xxDKTSba5NhJH88s=
go.opentelemetry.io/contrib/detectors/gcp v1.27.0/go.mod h1:amd+4uZxqJAUx7zI1JvygUtAc2EVWtQeyz8D+3161SQ=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 h1:vS1Ao/R55RNV4O7TA2Qopok8yN+X0LIP6RVWLFkprck=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0/go.mod h1:BMsdeOxN04K0L5FNUBfjFdvwWGNe/rkmSwH4Aelu/X0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 h1:9l89oX4ba9kHbBol3Xin3leYJ+252h0zszDtBwyKe2A=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0/go.mod h1:XLZfZboOJWHNKUv7eH0inh0E9VV6eWDFB/9yJyTLPp0=
go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg=
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ=
go.opentelemetry.io/otel/exporters/prometheus v0.49.0 h1:Er5I1g/YhfYv9Affk9nJLfH/+qCCVVg1f2R9AbJfqDQ=
go.opentelemetry.io/otel/exporters/prometheus v0.49.0/go.mod h1:KfQ1wpjf3zsHjzP149P4LyAwWRupc6c7t1ZJ9eXpKQM=
go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik=
go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI=
go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A=
go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2NemcCrOL8gI=
go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw=
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw=
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=

View File

@ -63,6 +63,8 @@ var (
func init() {
resolver.Register(NewBuilder())
internal.TimeAfterFunc = time.After
internal.TimeNowFunc = time.Now
internal.TimeUntilFunc = time.Until
internal.NewNetResolver = newNetResolver
internal.AddressDialer = addressDialer
}
@ -209,12 +211,12 @@ func (d *dnsResolver) watcher() {
err = d.cc.UpdateState(*state)
}
var waitTime time.Duration
var nextResolutionTime time.Time
if err == nil {
// Success resolving, wait for the next ResolveNow. However, also wait 30
// seconds at the very least to prevent constantly re-resolving.
backoffIndex = 1
waitTime = MinResolutionInterval
nextResolutionTime = internal.TimeNowFunc().Add(MinResolutionInterval)
select {
case <-d.ctx.Done():
return
@ -223,13 +225,13 @@ func (d *dnsResolver) watcher() {
} else {
// Poll on an error found in DNS Resolver or an error received from
// ClientConn.
waitTime = backoff.DefaultExponential.Backoff(backoffIndex)
nextResolutionTime = internal.TimeNowFunc().Add(backoff.DefaultExponential.Backoff(backoffIndex))
backoffIndex++
}
select {
case <-d.ctx.Done():
return
case <-internal.TimeAfterFunc(waitTime):
case <-internal.TimeAfterFunc(internal.TimeUntilFunc(nextResolutionTime)):
}
}
}

View File

@ -102,6 +102,27 @@ func overrideTimeAfterFuncWithChannel(t *testing.T) (durChan chan time.Duration,
return durChan, timeChan
}
// Override the current time used by the DNS resolver.
func overrideTimeNowFunc(t *testing.T, now time.Time) {
origTimeNowFunc := dnsinternal.TimeNowFunc
dnsinternal.TimeNowFunc = func() time.Time { return now }
t.Cleanup(func() { dnsinternal.TimeNowFunc = origTimeNowFunc })
}
// Override the remaining wait time to allow re-resolution by DNS resolver.
// Use the timeChan to read the time until resolver needs to wait for
// and return 0 wait time.
func overrideTimeUntilFuncWithChannel(t *testing.T) (timeChan chan time.Time) {
timeCh := make(chan time.Time, 1)
origTimeUntil := dnsinternal.TimeUntilFunc
dnsinternal.TimeUntilFunc = func(t time.Time) time.Duration {
timeCh <- t
return 0
}
t.Cleanup(func() { dnsinternal.TimeUntilFunc = origTimeUntil })
return timeCh
}
func enableSRVLookups(t *testing.T) {
origEnableSRVLookups := dns.EnableSRVLookups
dns.EnableSRVLookups = true
@ -1290,3 +1311,60 @@ func (s) TestMinResolutionInterval(t *testing.T) {
r.ResolveNow(resolver.ResolveNowOptions{})
}
}
// TestMinResolutionInterval_NoExtraDelay verifies that there is no extra delay
// between two resolution requests apart from [MinResolutionInterval].
func (s) TestMinResolutionInterval_NoExtraDelay(t *testing.T) {
tr := &testNetResolver{
hostLookupTable: map[string][]string{
"foo.bar.com": {"1.2.3.4", "5.6.7.8"},
},
txtLookupTable: map[string][]string{
"_grpc_config.foo.bar.com": txtRecordServiceConfig(txtRecordGood),
},
}
overrideNetResolver(t, tr)
// Override time.Now() to return a zero value for time. This will allow us
// to verify that the call to time.Until is made with the exact
// [MinResolutionInterval] that we expect.
overrideTimeNowFunc(t, time.Time{})
// Override time.Until() to read the time passed to it
// and return immediately without any delay
timeCh := overrideTimeUntilFuncWithChannel(t)
r, stateCh, errorCh := buildResolverWithTestClientConn(t, "foo.bar.com")
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Ensure that the first resolution happens.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver")
case err := <-errorCh:
t.Fatalf("Unexpected error from resolver, %v", err)
case <-stateCh:
}
// Request re-resolution and verify that the resolver waits for
// [MinResolutionInterval].
r.ResolveNow(resolver.ResolveNowOptions{})
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver")
case gotTime := <-timeCh:
wantTime := time.Time{}.Add(dns.MinResolutionInterval)
if !gotTime.Equal(wantTime) {
t.Fatalf("DNS resolver waits for %v time before re-resolution, want %v", gotTime, wantTime)
}
}
// Ensure that the re-resolution request actually happens.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for an error from the resolver")
case err := <-errorCh:
t.Fatalf("Unexpected error from resolver, %v", err)
case <-stateCh:
}
}

View File

@ -51,11 +51,22 @@ var (
// The following vars are overridden from tests.
var (
// TimeAfterFunc is used by the DNS resolver to wait for the given duration
// to elapse. In non-test code, this is implemented by time.After. In test
// to elapse. In non-test code, this is implemented by time.After. In test
// code, this can be used to control the amount of time the resolver is
// blocked waiting for the duration to elapse.
TimeAfterFunc func(time.Duration) <-chan time.Time
// TimeNowFunc is used by the DNS resolver to get the current time.
// In non-test code, this is implemented by time.Now. In test code,
// this can be used to control the current time for the resolver.
TimeNowFunc func() time.Time
// TimeUntilFunc is used by the DNS resolver to calculate the remaining
// wait time for re-resolution. In non-test code, this is implemented by
// time.Until. In test code, this can be used to control the remaining
// time for resolver to wait for re-resolution.
TimeUntilFunc func(time.Time) time.Duration
// NewNetResolver returns the net.Resolver instance for the given target.
NewNetResolver func(string) (NetResolver, error)

View File

@ -90,21 +90,6 @@ func Pairs(kv ...string) MD {
return md
}
// String implements the Stringer interface for pretty-printing a MD.
// Ordering of the values is non-deterministic as it ranges over a map.
func (md MD) String() string {
var sb strings.Builder
fmt.Fprintf(&sb, "MD{")
for k, v := range md {
if sb.Len() > 3 {
fmt.Fprintf(&sb, ", ")
}
fmt.Fprintf(&sb, "%s=[%s]", k, strings.Join(v, ", "))
}
fmt.Fprintf(&sb, "}")
return sb.String()
}
// Len returns the number of items in md.
func (md MD) Len() int {
return len(md)

View File

@ -22,7 +22,6 @@ import (
"context"
"reflect"
"strconv"
"strings"
"testing"
"time"
@ -339,26 +338,6 @@ func (s) TestAppendToOutgoingContext_FromKVSlice(t *testing.T) {
}
}
func TestStringerMD(t *testing.T) {
for _, test := range []struct {
md MD
want []string
}{
{MD{}, []string{"MD{}"}},
{MD{"k1": []string{}}, []string{"MD{k1=[]}"}},
{MD{"k1": []string{"v1", "v2"}}, []string{"MD{k1=[v1, v2]}"}},
{MD{"k1": []string{"v1"}}, []string{"MD{k1=[v1]}"}},
{MD{"k1": []string{"v1", "v2"}, "k2": []string{}, "k3": []string{"1", "2", "3"}}, []string{"MD{", "k1=[v1, v2]", "k2=[]", "k3=[1, 2, 3]", "}"}},
} {
got := test.md.String()
for _, want := range test.want {
if !strings.Contains(got, want) {
t.Fatalf("Metadata string %q is missing %q", got, want)
}
}
}
}
// Old/slow approach to adding metadata to context
func Benchmark_AddingMetadata_ContextManipulationApproach(b *testing.B) {
// TODO: Add in N=1-100 tests once Go1.6 support is removed.

View File

@ -63,7 +63,7 @@ func (h *clientStatsHandler) initializeMetrics() {
func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ci := &callInfo{
target: h.determineTarget(cc),
target: cc.CanonicalTarget(),
method: h.determineMethod(method, opts...),
}
ctx = setCallInfo(ctx, ci)
@ -83,17 +83,6 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
return err
}
// determineTarget determines the target to record attributes with. This will be
// "other" if target filter is set and specifies, the target name as is
// otherwise.
func (h *clientStatsHandler) determineTarget(cc *grpc.ClientConn) string {
target := cc.CanonicalTarget()
if f := h.options.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) {
target = "other"
}
return target
}
// determineMethod determines the method to record attributes with. This will be
// "other" if StaticMethod isn't specified or if method filter is set and
// specifies, the method name as is otherwise.
@ -108,7 +97,7 @@ func (h *clientStatsHandler) determineMethod(method string, opts ...grpc.CallOpt
func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ci := &callInfo{
target: h.determineTarget(cc),
target: cc.CanonicalTarget(),
method: h.determineMethod(method, opts...),
}
ctx = setCallInfo(ctx, ci)

View File

@ -0,0 +1,611 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package csm
import (
"context"
"errors"
"io"
"os"
"testing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/xds/bootstrap"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats/opentelemetry"
"google.golang.org/grpc/stats/opentelemetry/internal/testutils"
)
// setupEnv configures the environment for CSM Observability Testing. It sets
// the bootstrap env var to a bootstrap file with a nodeID provided. It sets CSM
// Env Vars as well, and mocks the resource detector's returned attribute set to
// simulate the environment. It registers a cleanup function on the provided t
// to restore the environment to it's original state.
func setupEnv(t *testing.T, resourceDetectorEmissions map[string]string, nodeID, csmCanonicalServiceName, csmWorkloadName string) {
cleanup, err := bootstrap.CreateFile(bootstrap.Options{
NodeID: nodeID,
ServerURI: "xds_server_uri",
})
if err != nil {
t.Fatalf("Failed to create bootstrap: %v", err)
}
oldCSMCanonicalServiceName, csmCanonicalServiceNamePresent := os.LookupEnv("CSM_CANONICAL_SERVICE_NAME")
oldCSMWorkloadName, csmWorkloadNamePresent := os.LookupEnv("CSM_WORKLOAD_NAME")
os.Setenv("CSM_CANONICAL_SERVICE_NAME", csmCanonicalServiceName)
os.Setenv("CSM_WORKLOAD_NAME", csmWorkloadName)
var attributes []attribute.KeyValue
for k, v := range resourceDetectorEmissions {
attributes = append(attributes, attribute.String(k, v))
}
// Return the attributes configured as part of the test in place
// of reading from resource.
attrSet := attribute.NewSet(attributes...)
origGetAttrSet := getAttrSetFromResourceDetector
getAttrSetFromResourceDetector = func(context.Context) *attribute.Set {
return &attrSet
}
t.Cleanup(func() {
cleanup()
if csmCanonicalServiceNamePresent {
os.Setenv("CSM_CANONICAL_SERVICE_NAME", oldCSMCanonicalServiceName)
} else {
os.Unsetenv("CSM_CANONICAL_SERVICE_NAME")
}
if csmWorkloadNamePresent {
os.Setenv("CSM_WORKLOAD_NAME", oldCSMWorkloadName)
} else {
os.Unsetenv("CSM_WORKLOAD_NAME")
}
getAttrSetFromResourceDetector = origGetAttrSet
})
}
// TestCSMPluginOptionUnary tests the CSM Plugin Option and labels. It
// configures the environment for the CSM Plugin Option to read from. It then
// configures a system with a gRPC Client and gRPC server with the OpenTelemetry
// Dial and Server Option configured with a CSM Plugin Option with a certain
// unary handler set to induce different ways of setting metadata exchange
// labels, and makes a Unary RPC. This RPC should cause certain recording for
// each registered metric observed through a Manual Metrics Reader on the
// provided OpenTelemetry SDK's Meter Provider. The CSM Labels emitted from the
// plugin option should be attached to the relevant metrics.
func (s) TestCSMPluginOptionUnary(t *testing.T) {
resourceDetectorEmissions := map[string]string{
"cloud.platform": "gcp_kubernetes_engine",
"cloud.region": "cloud_region_val", // availability_zone isn't present, so this should become location
"cloud.account.id": "cloud_account_id_val",
"k8s.namespace.name": "k8s_namespace_name_val",
"k8s.cluster.name": "k8s_cluster_name_val",
}
const nodeID = "projects/12345/networks/mesh:mesh_id/nodes/aaaa-aaaa-aaaa-aaaa"
const csmCanonicalServiceName = "csm_canonical_service_name"
const csmWorkloadName = "csm_workload_name"
setupEnv(t, resourceDetectorEmissions, nodeID, csmCanonicalServiceName, csmWorkloadName)
attributesWant := map[string]string{
"csm.workload_canonical_service": csmCanonicalServiceName, // from env
"csm.mesh_id": "mesh_id", // from bootstrap env var
// No xDS Labels - this happens in a test below.
"csm.remote_workload_type": "gcp_kubernetes_engine",
"csm.remote_workload_canonical_service": csmCanonicalServiceName,
"csm.remote_workload_project_id": "cloud_account_id_val",
"csm.remote_workload_cluster_name": "k8s_cluster_name_val",
"csm.remote_workload_namespace_name": "k8s_namespace_name_val",
"csm.remote_workload_location": "cloud_region_val",
"csm.remote_workload_name": csmWorkloadName,
}
var csmLabels []attribute.KeyValue
for k, v := range attributesWant {
csmLabels = append(csmLabels, attribute.String(k, v))
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
tests := []struct {
name string
// To test the different operations for Unary RPC's from the interceptor
// level that can plumb metadata exchange header in.
unaryCallFunc func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
opts testutils.MetricDataOptions
}{
{
name: "normal-flow",
unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Payload: &testpb.Payload{
Body: make([]byte, len(in.GetPayload().GetBody())),
}}, nil
},
opts: testutils.MetricDataOptions{
CSMLabels: csmLabels,
UnaryCompressedMessageSize: float64(57),
},
},
{
name: "trailers-only",
unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return nil, errors.New("some error") // return an error and no message - this triggers trailers only - no messages or headers sent
},
opts: testutils.MetricDataOptions{
CSMLabels: csmLabels,
UnaryCallFailed: true,
},
},
{
name: "set-header",
unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
grpc.SetHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"}))
return &testpb.SimpleResponse{Payload: &testpb.Payload{
Body: make([]byte, len(in.GetPayload().GetBody())),
}}, nil
},
opts: testutils.MetricDataOptions{
CSMLabels: csmLabels,
UnaryCompressedMessageSize: float64(57),
},
},
{
name: "send-header",
unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
grpc.SendHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"}))
return &testpb.SimpleResponse{Payload: &testpb.Payload{
Body: make([]byte, len(in.GetPayload().GetBody())),
}}, nil
},
opts: testutils.MetricDataOptions{
CSMLabels: csmLabels,
UnaryCompressedMessageSize: float64(57),
},
},
{
name: "send-msg",
unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Payload: &testpb.Payload{
Body: make([]byte, len(in.GetPayload().GetBody())),
}}, nil
},
opts: testutils.MetricDataOptions{
CSMLabels: csmLabels,
UnaryCompressedMessageSize: float64(57),
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
reader := metric.NewManualReader()
provider := metric.NewMeterProvider(metric.WithReader(reader))
ss := &stubserver.StubServer{UnaryCallF: test.unaryCallFunc}
po := newPluginOption(ctx)
sopts := []grpc.ServerOption{
serverOptionWithCSMPluginOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
}}, po),
}
dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"},
},
}, po)}
if err := ss.Start(sopts, dopts...); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
var request *testpb.SimpleRequest
if test.opts.UnaryCompressedMessageSize != 0 {
request = &testpb.SimpleRequest{Payload: &testpb.Payload{
Body: make([]byte, 10000),
}}
}
// Make a Unary RPC. These should cause certain metrics to be
// emitted, which should be able to be observed through the Metric
// Reader.
ss.Client.UnaryCall(ctx, request, grpc.UseCompressor(gzip.Name))
rm := &metricdata.ResourceMetrics{}
reader.Collect(ctx, rm)
gotMetrics := map[string]metricdata.Metrics{}
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
gotMetrics[m.Name] = m
}
}
opts := test.opts
opts.Target = ss.Target
wantMetrics := testutils.MetricDataUnary(opts)
testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics)
})
}
}
// TestCSMPluginOptionStreaming tests the CSM Plugin Option and labels. It
// configures the environment for the CSM Plugin Option to read from. It then
// configures a system with a gRPC Client and gRPC server with the OpenTelemetry
// Dial and Server Option configured with a CSM Plugin Option with a certain
// streaming handler set to induce different ways of setting metadata exchange
// labels, and makes a Streaming RPC. This RPC should cause certain recording
// for each registered metric observed through a Manual Metrics Reader on the
// provided OpenTelemetry SDK's Meter Provider. The CSM Labels emitted from the
// plugin option should be attached to the relevant metrics.
func (s) TestCSMPluginOptionStreaming(t *testing.T) {
resourceDetectorEmissions := map[string]string{
"cloud.platform": "gcp_kubernetes_engine",
"cloud.region": "cloud_region_val", // availability_zone isn't present, so this should become location
"cloud.account.id": "cloud_account_id_val",
"k8s.namespace.name": "k8s_namespace_name_val",
"k8s.cluster.name": "k8s_cluster_name_val",
}
const nodeID = "projects/12345/networks/mesh:mesh_id/nodes/aaaa-aaaa-aaaa-aaaa"
const csmCanonicalServiceName = "csm_canonical_service_name"
const csmWorkloadName = "csm_workload_name"
setupEnv(t, resourceDetectorEmissions, nodeID, csmCanonicalServiceName, csmWorkloadName)
attributesWant := map[string]string{
"csm.workload_canonical_service": csmCanonicalServiceName, // from env
"csm.mesh_id": "mesh_id", // from bootstrap env var
// No xDS Labels - this happens in a test below.
"csm.remote_workload_type": "gcp_kubernetes_engine",
"csm.remote_workload_canonical_service": csmCanonicalServiceName,
"csm.remote_workload_project_id": "cloud_account_id_val",
"csm.remote_workload_cluster_name": "k8s_cluster_name_val",
"csm.remote_workload_namespace_name": "k8s_namespace_name_val",
"csm.remote_workload_location": "cloud_region_val",
"csm.remote_workload_name": csmWorkloadName,
}
var csmLabels []attribute.KeyValue
for k, v := range attributesWant {
csmLabels = append(csmLabels, attribute.String(k, v))
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
tests := []struct {
name string
// To test the different operations for Streaming RPC's from the
// interceptor level that can plumb metadata exchange header in.
streamingCallFunc func(stream testgrpc.TestService_FullDuplexCallServer) error
opts testutils.MetricDataOptions
}{
{
name: "trailers-only",
streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
if _, err := stream.Recv(); err == io.EOF {
return nil
}
}
},
opts: testutils.MetricDataOptions{
CSMLabels: csmLabels,
},
},
{
name: "set-header",
streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error {
stream.SetHeader(metadata.New(map[string]string{"some-metadata": "some-metadata-val"}))
for {
if _, err := stream.Recv(); err == io.EOF {
return nil
}
}
},
opts: testutils.MetricDataOptions{
CSMLabels: csmLabels,
},
},
{
name: "send-header",
streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error {
stream.SendHeader(metadata.New(map[string]string{"some-metadata": "some-metadata-val"}))
for {
if _, err := stream.Recv(); err == io.EOF {
return nil
}
}
},
opts: testutils.MetricDataOptions{
CSMLabels: csmLabels,
},
},
{
name: "send-msg",
streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error {
stream.Send(&testpb.StreamingOutputCallResponse{Payload: &testpb.Payload{
Body: make([]byte, 10000),
}})
for {
if _, err := stream.Recv(); err == io.EOF {
return nil
}
}
},
opts: testutils.MetricDataOptions{
CSMLabels: csmLabels,
StreamingCompressedMessageSize: float64(57),
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
reader := metric.NewManualReader()
provider := metric.NewMeterProvider(metric.WithReader(reader))
ss := &stubserver.StubServer{FullDuplexCallF: test.streamingCallFunc}
po := newPluginOption(ctx)
sopts := []grpc.ServerOption{
serverOptionWithCSMPluginOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
}}, po),
}
dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"},
},
}, po)}
if err := ss.Start(sopts, dopts...); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
stream, err := ss.Client.FullDuplexCall(ctx, grpc.UseCompressor(gzip.Name))
if err != nil {
t.Fatalf("ss.Client.FullDuplexCall failed: %f", err)
}
if test.opts.StreamingCompressedMessageSize != 0 {
if err := stream.Send(&testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{
Body: make([]byte, 10000),
}}); err != nil {
t.Fatalf("stream.Send failed")
}
if _, err := stream.Recv(); err != nil {
t.Fatalf("stream.Recv failed with error: %v", err)
}
}
stream.CloseSend()
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err)
}
rm := &metricdata.ResourceMetrics{}
reader.Collect(ctx, rm)
gotMetrics := map[string]metricdata.Metrics{}
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
gotMetrics[m.Name] = m
}
}
opts := test.opts
opts.Target = ss.Target
wantMetrics := testutils.MetricDataStreaming(opts)
testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics)
})
}
}
func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = istats.SetLabels(ctx, &istats.Labels{
TelemetryLabels: map[string]string{
// mock what the cluster impl would write here ("csm." xDS Labels)
"csm.service_name": "service_name_val",
"csm.service_namespace_name": "service_namespace_val",
},
})
// TagRPC will just see this in the context and set it's xDS Labels to point
// to this map on the heap.
return invoker(ctx, method, req, reply, cc, opts...)
}
// TestXDSLabels tests that xDS Labels get emitted from OpenTelemetry metrics.
// This test configures OpenTelemetry with the CSM Plugin Option, and xDS
// Optional Labels turned on. It then configures an interceptor to attach
// labels, representing the cluster_impl picker. It then makes a unary RPC, and
// expects xDS Labels labels to be attached to emitted relevant metrics. Full
// xDS System alongside OpenTelemetry will be tested with interop. (there is
// a test for xDS -> Stats handler and this tests -> OTel -> emission).
func (s) TestXDSLabels(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
reader := metric.NewManualReader()
provider := metric.NewMeterProvider(metric.WithReader(reader))
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Payload: &testpb.Payload{
Body: make([]byte, len(in.GetPayload().GetBody())),
}}, nil
},
}
po := newPluginOption(ctx)
dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"},
},
}, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)}
if err := ss.Start(nil, dopts...); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{
Body: make([]byte, 10000),
}}, grpc.UseCompressor(gzip.Name))
rm := &metricdata.ResourceMetrics{}
reader.Collect(ctx, rm)
gotMetrics := map[string]metricdata.Metrics{}
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
gotMetrics[m.Name] = m
}
}
unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall")
targetAttr := attribute.String("grpc.target", ss.Target)
unaryStatusAttr := attribute.String("grpc.status", "OK")
serviceNameAttr := attribute.String("csm.service_name", "service_name_val")
serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val")
meshIDAttr := attribute.String("csm.mesh_id", "unknown")
workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown")
remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown")
remoteWorkloadCanonicalServiceAttr := attribute.String("csm.remote_workload_canonical_service", "unknown")
unaryMethodClientSideEnd := []attribute.KeyValue{
unaryMethodAttr,
targetAttr,
unaryStatusAttr,
serviceNameAttr,
serviceNamespaceAttr,
meshIDAttr,
workloadCanonicalServiceAttr,
remoteWorkloadTypeAttr,
remoteWorkloadCanonicalServiceAttr,
}
unaryCompressedBytesSentRecv := int64(57) // Fixed 10000 bytes with gzip assumption.
unaryBucketCounts := []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
unaryExtrema := metricdata.NewExtrema(int64(57))
wantMetrics := []metricdata.Metrics{
{
Name: "grpc.client.attempt.started",
Description: "Number of client call attempts started.",
Unit: "attempt",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, targetAttr),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
}, // Doesn't have xDS Labels, CSM Labels start from header or trailer from server, whichever comes first, so this doesn't need it
{
Name: "grpc.client.attempt.duration",
Description: "End-to-end time taken to complete a client call attempt.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(unaryMethodClientSideEnd...),
Count: 1,
Bounds: testutils.DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.attempt.sent_total_compressed_message_size",
Description: "Compressed message bytes sent per client call attempt.",
Unit: "By",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodClientSideEnd...),
Count: 1,
Bounds: testutils.DefaultSizeBounds,
BucketCounts: unaryBucketCounts,
Min: unaryExtrema,
Max: unaryExtrema,
Sum: unaryCompressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.attempt.rcvd_total_compressed_message_size",
Description: "Compressed message bytes received per call attempt.",
Unit: "By",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodClientSideEnd...),
Count: 1,
Bounds: testutils.DefaultSizeBounds,
BucketCounts: unaryBucketCounts,
Min: unaryExtrema,
Max: unaryExtrema,
Sum: unaryCompressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.call.duration",
Description: "Time taken by gRPC to complete an RPC from application's perspective.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, unaryStatusAttr),
Count: 1,
Bounds: testutils.DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
}
testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics)
}
// TestObservability tests that Observability global function compiles and runs
// without error. The actual functionality of this function will be verified in
// interop tests.
func (s) TestObservability(t *testing.T) {
cleanup := EnableObservability(context.Background(), opentelemetry.Options{})
cleanup()
}

View File

@ -14,11 +14,10 @@
* limitations under the License.
*/
package opentelemetry
package opentelemetry_test
import (
"context"
"fmt"
"io"
"testing"
"time"
@ -29,6 +28,8 @@ import (
"google.golang.org/grpc/internal/stubserver"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/stats/opentelemetry"
"google.golang.org/grpc/stats/opentelemetry/internal/testutils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
@ -46,35 +47,10 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
// waitForServerCompletedRPCs waits until the unary and streaming stats.End
// calls are finished processing. It does this by waiting for the expected
// metric triggered by stats.End to appear through the passed in metrics reader.
func waitForServerCompletedRPCs(ctx context.Context, reader metric.Reader, wantMetric metricdata.Metrics, t *testing.T) (map[string]metricdata.Metrics, error) {
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
rm := &metricdata.ResourceMetrics{}
reader.Collect(ctx, rm)
gotMetrics := map[string]metricdata.Metrics{}
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
gotMetrics[m.Name] = m
}
}
val, ok := gotMetrics[wantMetric.Name]
if !ok {
continue
}
if !metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
continue
}
return gotMetrics, nil
}
return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err())
}
// setup creates a stub server with OpenTelemetry component configured on client
// and server side. It returns a reader for metrics emitted from OpenTelemetry
// component and the server.
func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReader, *stubserver.StubServer) {
func setup(t *testing.T, methodAttributeFilter func(string) bool) (*metric.ManualReader, *stubserver.StubServer) {
reader := metric.NewManualReader()
provider := metric.NewMeterProvider(
metric.WithReader(reader),
@ -82,7 +58,7 @@ func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReade
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Payload: &testpb.Payload{
Body: make([]byte, 10000),
Body: make([]byte, len(in.GetPayload().GetBody())),
}}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
@ -94,24 +70,16 @@ func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReade
}
},
}
var taf func(string) bool
if tafOn {
taf = func(str string) bool {
return str != ss.Target
}
}
if err := ss.Start([]grpc.ServerOption{ServerOption(Options{
MetricsOptions: MetricsOptions{
if err := ss.Start([]grpc.ServerOption{opentelemetry.ServerOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: DefaultMetrics,
TargetAttributeFilter: taf,
MethodAttributeFilter: maf,
}})}, DialOption(Options{
MetricsOptions: MetricsOptions{
MeterProvider: provider,
Metrics: DefaultMetrics,
TargetAttributeFilter: taf,
MethodAttributeFilter: maf,
Metrics: opentelemetry.DefaultMetrics,
MethodAttributeFilter: methodAttributeFilter,
}})}, opentelemetry.DialOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
},
})); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
@ -119,20 +87,19 @@ func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReade
return reader, ss
}
// TestMethodTargetAttributeFilter tests the method and target attribute filter.
// The method and target filter set should bucket the grpc.method/grpc.target
// attribute into "other" if filter specifies.
func (s) TestMethodTargetAttributeFilter(t *testing.T) {
// TestMethodAttributeFilter tests the method attribute filter. The method
// filter set should bucket the grpc.method attribute into "other" if the method
// attribute filter specifies.
func (s) TestMethodAttributeFilter(t *testing.T) {
maf := func(str string) bool {
// Will allow duplex/any other type of RPC.
return str != "/grpc.testing.TestService/UnaryCall"
return str != testpb.TestService_UnaryCall_FullMethodName
}
// pull out setup into a helper
reader, ss := setup(t, true, maf)
reader, ss := setup(t, maf)
defer ss.Stop()
// make a single RPC (unary rpc), and filter out the target and method
// that would correspond.
// Make a Unary and Streaming RPC. The Unary RPC should be filtered by the
// method attribute filter, and the Full Duplex (Streaming) RPC should not.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{
@ -147,10 +114,16 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) {
stream.CloseSend()
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err)
}
rm := &metricdata.ResourceMetrics{}
reader.Collect(ctx, rm)
gotMetrics := map[string]metricdata.Metrics{}
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
gotMetrics[m.Name] = m
}
}
wantMetrics := []metricdata.Metrics{
{
@ -160,11 +133,11 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) {
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall"), attribute.String("grpc.target", "other")),
Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall"), attribute.String("grpc.target", ss.Target)),
Value: 1,
},
{
Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.target", "other")),
Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.target", ss.Target)),
Value: 1,
},
},
@ -172,54 +145,29 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) {
IsMonotonic: true,
},
},
}
gotMetrics := map[string]metricdata.Metrics{}
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
gotMetrics[m.Name] = m
}
{
Name: "grpc.server.call.duration",
Description: "End-to-end time taken to complete a call from server transport's perspective.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{ // Method should go to "other" due to the method attribute filter.
Attributes: attribute.NewSet(attribute.String("grpc.method", "other"), attribute.String("grpc.status", "OK")),
Count: 1,
Bounds: testutils.DefaultLatencyBounds,
},
{
Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.status", "OK")),
Count: 1,
Bounds: testutils.DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
}
for _, metric := range wantMetrics {
val, ok := gotMetrics[metric.Name]
if !ok {
t.Fatalf("metric %v not present in recorded metrics", metric.Name)
}
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
t.Fatalf("metrics data type not equal for metric: %v", metric.Name)
}
}
}
// assertDataPointWithinFiveSeconds asserts the metric passed in contains
// a histogram with dataPoints that fall within buckets that are <=5.
func assertDataPointWithinFiveSeconds(metric metricdata.Metrics) error {
histo, ok := metric.Data.(metricdata.Histogram[float64])
if !ok {
return fmt.Errorf("metric data is not histogram")
}
for _, dataPoint := range histo.DataPoints {
var boundWithFive int
for i, bucket := range dataPoint.Bounds {
if bucket >= 5 {
boundWithFive = i
}
}
foundPoint := false
for i, bucket := range dataPoint.BucketCounts {
if i >= boundWithFive {
return fmt.Errorf("data point not found in bucket <=5 seconds")
}
if bucket == 1 {
foundPoint = true
break
}
}
if !foundPoint {
return fmt.Errorf("no data point found for metric")
}
}
return nil
testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics)
}
// TestAllMetricsOneFunction tests emitted metrics from OpenTelemetry
@ -232,7 +180,7 @@ func assertDataPointWithinFiveSeconds(metric metricdata.Metrics) error {
// on the Client (no StaticMethodCallOption set) and Server. The method
// attribute on subsequent metrics should be bucketed in "other".
func (s) TestAllMetricsOneFunction(t *testing.T) {
reader, ss := setup(t, false, nil)
reader, ss := setup(t, nil)
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
@ -251,7 +199,7 @@ func (s) TestAllMetricsOneFunction(t *testing.T) {
stream.CloseSend()
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err)
}
rm := &metricdata.ResourceMetrics{}
@ -264,257 +212,11 @@ func (s) TestAllMetricsOneFunction(t *testing.T) {
}
}
unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall")
duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall")
targetAttr := attribute.String("grpc.target", ss.Target)
statusAttr := attribute.String("grpc.status", "OK")
wantMetrics := []metricdata.Metrics{
{
Name: "grpc.client.attempt.started",
Description: "Number of client call attempts started.",
Unit: "attempt",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, targetAttr),
Value: 1,
},
{
Attributes: attribute.NewSet(duplexMethodAttr, targetAttr),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
{
Name: "grpc.client.attempt.duration",
Description: "End-to-end time taken to complete a client call attempt.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr),
Count: 1,
Bounds: DefaultLatencyBounds,
},
{
Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.attempt.sent_total_compressed_message_size",
Description: "Compressed message bytes sent per client call attempt.",
Unit: "By",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Min: metricdata.NewExtrema(int64(57)),
Max: metricdata.NewExtrema(int64(57)),
Sum: 57,
},
{
Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Min: metricdata.NewExtrema(int64(0)),
Max: metricdata.NewExtrema(int64(0)),
Sum: 0,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.attempt.rcvd_total_compressed_message_size",
Description: "Compressed message bytes received per call attempt.",
Unit: "By",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Min: metricdata.NewExtrema(int64(57)),
Max: metricdata.NewExtrema(int64(57)),
Sum: 57,
},
{
Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Min: metricdata.NewExtrema(int64(0)),
Max: metricdata.NewExtrema(int64(0)),
Sum: 0,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.call.duration",
Description: "Time taken by gRPC to complete an RPC from application's perspective.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr),
Count: 1,
Bounds: DefaultLatencyBounds,
},
{
Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.started",
Description: "Number of server calls started.",
Unit: "call",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodAttr),
Value: 1,
},
{
Attributes: attribute.NewSet(duplexMethodAttr),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
{
Name: "grpc.server.call.sent_total_compressed_message_size",
Unit: "By",
Description: "Compressed message bytes sent per server call.",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, statusAttr),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Min: metricdata.NewExtrema(int64(57)),
Max: metricdata.NewExtrema(int64(57)),
Sum: 57,
},
{
Attributes: attribute.NewSet(duplexMethodAttr, statusAttr),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Min: metricdata.NewExtrema(int64(0)),
Max: metricdata.NewExtrema(int64(0)),
Sum: 0,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.rcvd_total_compressed_message_size",
Unit: "By",
Description: "Compressed message bytes received per server call.",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, statusAttr),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Min: metricdata.NewExtrema(int64(57)),
Max: metricdata.NewExtrema(int64(57)),
Sum: 57,
},
{
Attributes: attribute.NewSet(duplexMethodAttr, statusAttr),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Min: metricdata.NewExtrema(int64(0)),
Max: metricdata.NewExtrema(int64(0)),
Sum: 0,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.duration",
Description: "End-to-end time taken to complete a call from server transport's perspective.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, statusAttr),
Count: 1,
Bounds: DefaultLatencyBounds,
},
{
Attributes: attribute.NewSet(duplexMethodAttr, statusAttr),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
}
for _, metric := range wantMetrics {
if metric.Name == "grpc.server.call.sent_total_compressed_message_size" || metric.Name == "grpc.server.call.rcvd_total_compressed_message_size" {
// Sync the metric reader to see the event because stats.End is
// handled async server side. Thus, poll until metrics created from
// stats.End show up.
if gotMetrics, err = waitForServerCompletedRPCs(ctx, reader, metric, t); err != nil {
t.Fatalf("error waiting for sent total compressed message size for metric: %v", metric.Name)
}
continue
}
// If one of the duration metrics, ignore the bucket counts, and make
// sure it count falls within a bucket <= 5 seconds (maximum duration of
// test due to context).
val, ok := gotMetrics[metric.Name]
if !ok {
t.Fatalf("metric %v not present in recorded metrics", metric.Name)
}
if metric.Name == "grpc.client.attempt.duration" || metric.Name == "grpc.client.call.duration" || metric.Name == "grpc.server.call.duration" {
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) {
t.Fatalf("metrics data type not equal for metric: %v", metric.Name)
}
if err := assertDataPointWithinFiveSeconds(val); err != nil {
t.Fatalf("Data point not within five seconds for metric %v: %v", metric.Name, err)
}
continue
}
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
t.Fatalf("metrics data type not equal for metric: %v", metric.Name)
}
}
wantMetrics := testutils.MetricData(testutils.MetricDataOptions{
Target: ss.Target,
UnaryCompressedMessageSize: float64(57),
})
testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics)
stream, err = ss.Client.FullDuplexCall(ctx)
if err != nil {
@ -523,7 +225,7 @@ func (s) TestAllMetricsOneFunction(t *testing.T) {
stream.CloseSend()
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err)
}
// This Invoke doesn't pass the StaticMethodCallOption. Thus, the method
// attribute should become "other" on client side metrics. Since it is also
@ -541,6 +243,10 @@ func (s) TestAllMetricsOneFunction(t *testing.T) {
gotMetrics[m.Name] = m
}
}
unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall")
duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall")
targetAttr := attribute.String("grpc.target", ss.Target)
otherMethodAttr := attribute.String("grpc.method", "other")
wantMetrics = []metricdata.Metrics{
{
@ -593,10 +299,10 @@ func (s) TestAllMetricsOneFunction(t *testing.T) {
for _, metric := range wantMetrics {
val, ok := gotMetrics[metric.Name]
if !ok {
t.Fatalf("metric %v not present in recorded metrics", metric.Name)
t.Fatalf("Metric %v not present in recorded metrics", metric.Name)
}
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
t.Fatalf("metrics data type not equal for metric: %v", metric.Name)
t.Fatalf("Metrics data type not equal for metric: %v", metric.Name)
}
}
}

View File

@ -17,8 +17,6 @@
package opentelemetry_test
import (
"strings"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/stats/opentelemetry"
@ -55,9 +53,6 @@ func Example_dialOption() {
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics, // equivalent to unset - distinct from empty
TargetAttributeFilter: func(str string) bool {
return !strings.HasPrefix(str, "dns") // Filter out DNS targets.
},
},
}
do := opentelemetry.DialOption(opts)

View File

@ -0,0 +1,796 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package testutils contains helpers for OpenTelemetry tests.
package testutils
import (
"context"
"fmt"
"testing"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
// Redefine default bounds here to avoid a cyclic dependency with top level
// opentelemetry package. Could define once through internal, but would make
// external opentelemetry godoc less readable.
var (
// DefaultLatencyBounds are the default bounds for latency metrics.
DefaultLatencyBounds = []float64{0, 0.00001, 0.00005, 0.0001, 0.0003, 0.0006, 0.0008, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.008, 0.01, 0.013, 0.016, 0.02, 0.025, 0.03, 0.04, 0.05, 0.065, 0.08, 0.1, 0.13, 0.16, 0.2, 0.25, 0.3, 0.4, 0.5, 0.65, 0.8, 1, 2, 5, 10, 20, 50, 100}
// DefaultSizeBounds are the default bounds for metrics which record size.
DefaultSizeBounds = []float64{0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296}
)
// waitForServerCompletedRPCs waits until the unary and streaming stats.End
// calls are finished processing. It does this by waiting for the expected
// metric triggered by stats.End to appear through the passed in metrics reader.
//
// Returns a new gotMetrics map containing the metric data being polled for, or
// an error if failed to wait for metric.
func waitForServerCompletedRPCs(ctx context.Context, t *testing.T, reader metric.Reader, wantMetric metricdata.Metrics) (map[string]metricdata.Metrics, error) {
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
rm := &metricdata.ResourceMetrics{}
reader.Collect(ctx, rm)
gotMetrics := map[string]metricdata.Metrics{}
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
gotMetrics[m.Name] = m
}
}
val, ok := gotMetrics[wantMetric.Name]
if !ok {
continue
}
metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
return gotMetrics, nil
}
return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err())
}
// checkDataPointWithinFiveSeconds checks if the metric passed in contains a
// histogram with dataPoints that fall within buckets that are <=5. Returns an
// error if check fails.
func checkDataPointWithinFiveSeconds(metric metricdata.Metrics) error {
histo, ok := metric.Data.(metricdata.Histogram[float64])
if !ok {
return fmt.Errorf("metric data is not histogram")
}
for _, dataPoint := range histo.DataPoints {
var boundWithFive int
for i, bound := range dataPoint.Bounds {
if bound >= 5 {
boundWithFive = i
}
}
foundPoint := false
for i, count := range dataPoint.BucketCounts {
if i >= boundWithFive {
return fmt.Errorf("data point not found in bucket <=5 seconds")
}
if count == 1 {
foundPoint = true
break
}
}
if !foundPoint {
return fmt.Errorf("no data point found for metric")
}
}
return nil
}
// MetricDataOptions are the options used to configure the metricData emissions
// of expected metrics data from NewMetricData.
type MetricDataOptions struct {
// CSMLabels are the csm labels to attach to metrics which receive csm
// labels (all A66 expect client call and started RPC's client and server
// side).
CSMLabels []attribute.KeyValue
// Target is the target of the client and server.
Target string
// UnaryCallFailed is whether the Unary Call failed, which would trigger
// trailers only.
UnaryCallFailed bool
// UnaryCompressedMessageSize is the compressed message size of the Unary
// RPC. This assumes both client and server sent the same message size.
UnaryCompressedMessageSize float64
// StreamingCompressedMessageSize is the compressed message size of the
// Streaming RPC. This assumes both client and server sent the same message
// size.
StreamingCompressedMessageSize float64
}
// createBucketCounts creates a list of bucket counts based off the
// recordingPoints and bounds. Both recordingPoints and bounds are assumed to be
// in order.
func createBucketCounts(recordingPoints []float64, bounds []float64) []uint64 {
var bucketCounts []uint64
var recordingPointIndex int
for _, bound := range bounds {
var bucketCount uint64
if recordingPointIndex >= len(recordingPoints) {
bucketCounts = append(bucketCounts, bucketCount)
continue
}
for recordingPoints[recordingPointIndex] <= bound {
bucketCount += 1
recordingPointIndex += 1
if recordingPointIndex >= len(recordingPoints) {
break
}
}
bucketCounts = append(bucketCounts, bucketCount)
}
// The rest of the recording points are last bound -> infinity.
bucketCounts = append(bucketCounts, uint64(len(recordingPoints)-recordingPointIndex))
return bucketCounts
}
// MetricDataUnary returns a list of expected metrics defined in A66 for a
// client and server for one unary RPC.
func MetricDataUnary(options MetricDataOptions) []metricdata.Metrics {
methodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall")
targetAttr := attribute.String("grpc.target", options.Target)
statusAttr := attribute.String("grpc.status", "OK")
if options.UnaryCallFailed {
statusAttr = attribute.String("grpc.status", "UNKNOWN")
}
clientSideEnd := []attribute.KeyValue{
methodAttr,
targetAttr,
statusAttr,
}
serverSideEnd := []attribute.KeyValue{
methodAttr,
statusAttr,
}
clientSideEnd = append(clientSideEnd, options.CSMLabels...)
serverSideEnd = append(serverSideEnd, options.CSMLabels...)
compressedBytesSentRecv := int64(options.UnaryCompressedMessageSize)
bucketCounts := createBucketCounts([]float64{options.UnaryCompressedMessageSize}, DefaultSizeBounds)
extrema := metricdata.NewExtrema(int64(options.UnaryCompressedMessageSize))
return []metricdata.Metrics{
{
Name: "grpc.client.attempt.started",
Description: "Number of client call attempts started.",
Unit: "attempt",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(methodAttr, targetAttr),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
{
Name: "grpc.client.attempt.duration",
Description: "End-to-end time taken to complete a client call attempt.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(clientSideEnd...),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.attempt.sent_total_compressed_message_size",
Description: "Compressed message bytes sent per client call attempt.",
Unit: "By",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(clientSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: bucketCounts,
Min: extrema,
Max: extrema,
Sum: compressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.attempt.rcvd_total_compressed_message_size",
Description: "Compressed message bytes received per call attempt.",
Unit: "By",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(clientSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: bucketCounts,
Min: extrema,
Max: extrema,
Sum: compressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.call.duration",
Description: "Time taken by gRPC to complete an RPC from application's perspective.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(methodAttr, targetAttr, statusAttr),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.started",
Description: "Number of server calls started.",
Unit: "call",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(methodAttr),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
{
Name: "grpc.server.call.sent_total_compressed_message_size",
Unit: "By",
Description: "Compressed message bytes sent per server call.",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(serverSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: bucketCounts,
Min: extrema,
Max: extrema,
Sum: compressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.rcvd_total_compressed_message_size",
Unit: "By",
Description: "Compressed message bytes received per server call.",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(serverSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: bucketCounts,
Min: extrema,
Max: extrema,
Sum: compressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.duration",
Description: "End-to-end time taken to complete a call from server transport's perspective.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(serverSideEnd...),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
}
}
// MetricDataStreaming returns a list of expected metrics defined in A66 for a
// client and server for one streaming RPC.
func MetricDataStreaming(options MetricDataOptions) []metricdata.Metrics {
methodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall")
targetAttr := attribute.String("grpc.target", options.Target)
statusAttr := attribute.String("grpc.status", "OK")
clientSideEnd := []attribute.KeyValue{
methodAttr,
targetAttr,
statusAttr,
}
serverSideEnd := []attribute.KeyValue{
methodAttr,
statusAttr,
}
clientSideEnd = append(clientSideEnd, options.CSMLabels...)
serverSideEnd = append(serverSideEnd, options.CSMLabels...)
compressedBytesSentRecv := int64(options.StreamingCompressedMessageSize)
bucketCounts := createBucketCounts([]float64{options.StreamingCompressedMessageSize}, DefaultSizeBounds)
extrema := metricdata.NewExtrema(int64(options.StreamingCompressedMessageSize))
return []metricdata.Metrics{
{
Name: "grpc.client.attempt.started",
Description: "Number of client call attempts started.",
Unit: "attempt",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(methodAttr, targetAttr),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
{
Name: "grpc.client.attempt.duration",
Description: "End-to-end time taken to complete a client call attempt.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(clientSideEnd...),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.attempt.sent_total_compressed_message_size",
Description: "Compressed message bytes sent per client call attempt.",
Unit: "By",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(clientSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: bucketCounts,
Min: extrema,
Max: extrema,
Sum: compressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.attempt.rcvd_total_compressed_message_size",
Description: "Compressed message bytes received per call attempt.",
Unit: "By",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(clientSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: bucketCounts,
Min: extrema,
Max: extrema,
Sum: compressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.call.duration",
Description: "Time taken by gRPC to complete an RPC from application's perspective.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(methodAttr, targetAttr, statusAttr),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.started",
Description: "Number of server calls started.",
Unit: "call",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(methodAttr),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
{
Name: "grpc.server.call.sent_total_compressed_message_size",
Unit: "By",
Description: "Compressed message bytes sent per server call.",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(serverSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: bucketCounts,
Min: extrema,
Max: extrema,
Sum: compressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.rcvd_total_compressed_message_size",
Unit: "By",
Description: "Compressed message bytes received per server call.",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(serverSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: bucketCounts,
Min: extrema,
Max: extrema,
Sum: compressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.duration",
Description: "End-to-end time taken to complete a call from server transport's perspective.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(serverSideEnd...),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
}
}
// MetricData returns a metricsDataSlice for A66 metrics for client and server
// with a unary RPC and streaming RPC with certain compression and message flow
// sent. If csmAttributes is set to true, the corresponding CSM Metrics (not
// client side call metrics, or started on client and server side).
func MetricData(options MetricDataOptions) []metricdata.Metrics {
unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall")
duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall")
targetAttr := attribute.String("grpc.target", options.Target)
unaryStatusAttr := attribute.String("grpc.status", "OK")
streamingStatusAttr := attribute.String("grpc.status", "OK")
if options.UnaryCallFailed {
unaryStatusAttr = attribute.String("grpc.status", "UNKNOWN")
}
unaryMethodClientSideEnd := []attribute.KeyValue{
unaryMethodAttr,
targetAttr,
unaryStatusAttr,
}
streamingMethodClientSideEnd := []attribute.KeyValue{
duplexMethodAttr,
targetAttr,
streamingStatusAttr,
}
unaryMethodServerSideEnd := []attribute.KeyValue{
unaryMethodAttr,
unaryStatusAttr,
}
streamingMethodServerSideEnd := []attribute.KeyValue{
duplexMethodAttr,
streamingStatusAttr,
}
unaryMethodClientSideEnd = append(unaryMethodClientSideEnd, options.CSMLabels...)
streamingMethodClientSideEnd = append(streamingMethodClientSideEnd, options.CSMLabels...)
unaryMethodServerSideEnd = append(unaryMethodServerSideEnd, options.CSMLabels...)
streamingMethodServerSideEnd = append(streamingMethodServerSideEnd, options.CSMLabels...)
unaryCompressedBytesSentRecv := int64(options.UnaryCompressedMessageSize)
unaryBucketCounts := createBucketCounts([]float64{options.UnaryCompressedMessageSize}, DefaultSizeBounds)
unaryExtrema := metricdata.NewExtrema(int64(options.UnaryCompressedMessageSize))
streamingCompressedBytesSentRecv := int64(options.StreamingCompressedMessageSize)
streamingBucketCounts := createBucketCounts([]float64{options.StreamingCompressedMessageSize}, DefaultSizeBounds)
streamingExtrema := metricdata.NewExtrema(int64(options.StreamingCompressedMessageSize))
return []metricdata.Metrics{
{
Name: "grpc.client.attempt.started",
Description: "Number of client call attempts started.",
Unit: "attempt",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, targetAttr),
Value: 1,
},
{
Attributes: attribute.NewSet(duplexMethodAttr, targetAttr),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
{
Name: "grpc.client.attempt.duration",
Description: "End-to-end time taken to complete a client call attempt.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(unaryMethodClientSideEnd...),
Count: 1,
Bounds: DefaultLatencyBounds,
},
{
Attributes: attribute.NewSet(streamingMethodClientSideEnd...),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.attempt.sent_total_compressed_message_size",
Description: "Compressed message bytes sent per client call attempt.",
Unit: "By",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodClientSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: unaryBucketCounts,
Min: unaryExtrema,
Max: unaryExtrema,
Sum: unaryCompressedBytesSentRecv,
},
{
Attributes: attribute.NewSet(streamingMethodClientSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: streamingBucketCounts,
Min: streamingExtrema,
Max: streamingExtrema,
Sum: streamingCompressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.attempt.rcvd_total_compressed_message_size",
Description: "Compressed message bytes received per call attempt.",
Unit: "By",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodClientSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: unaryBucketCounts,
Min: unaryExtrema,
Max: unaryExtrema,
Sum: unaryCompressedBytesSentRecv,
},
{
Attributes: attribute.NewSet(streamingMethodClientSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: streamingBucketCounts,
Min: streamingExtrema,
Max: streamingExtrema,
Sum: streamingCompressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.client.call.duration",
Description: "Time taken by gRPC to complete an RPC from application's perspective.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, unaryStatusAttr),
Count: 1,
Bounds: DefaultLatencyBounds,
},
{
Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, streamingStatusAttr),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.started",
Description: "Number of server calls started.",
Unit: "call",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodAttr),
Value: 1,
},
{
Attributes: attribute.NewSet(duplexMethodAttr),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
{
Name: "grpc.server.call.sent_total_compressed_message_size",
Unit: "By",
Description: "Compressed message bytes sent per server call.",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodServerSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: unaryBucketCounts,
Min: unaryExtrema,
Max: unaryExtrema,
Sum: unaryCompressedBytesSentRecv,
},
{
Attributes: attribute.NewSet(streamingMethodServerSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: streamingBucketCounts,
Min: streamingExtrema,
Max: streamingExtrema,
Sum: streamingCompressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.rcvd_total_compressed_message_size",
Unit: "By",
Description: "Compressed message bytes received per server call.",
Data: metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(unaryMethodServerSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: unaryBucketCounts,
Min: unaryExtrema,
Max: unaryExtrema,
Sum: unaryCompressedBytesSentRecv,
},
{
Attributes: attribute.NewSet(streamingMethodServerSideEnd...),
Count: 1,
Bounds: DefaultSizeBounds,
BucketCounts: streamingBucketCounts,
Min: streamingExtrema,
Max: streamingExtrema,
Sum: streamingCompressedBytesSentRecv,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: "grpc.server.call.duration",
Description: "End-to-end time taken to complete a call from server transport's perspective.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(unaryMethodServerSideEnd...),
Count: 1,
Bounds: DefaultLatencyBounds,
},
{
Attributes: attribute.NewSet(streamingMethodServerSideEnd...),
Count: 1,
Bounds: DefaultLatencyBounds,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
}
}
// CompareMetrics asserts wantMetrics are what we expect. It polls for eventual
// server metrics (not emitted synchronously with client side rpc returning),
// and for duration metrics makes sure the data point is within possible testing
// time (five seconds from context timeout).
func CompareMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, gotMetrics map[string]metricdata.Metrics, wantMetrics []metricdata.Metrics) {
for _, metric := range wantMetrics {
if metric.Name == "grpc.server.call.sent_total_compressed_message_size" || metric.Name == "grpc.server.call.rcvd_total_compressed_message_size" {
// Sync the metric reader to see the event because stats.End is
// handled async server side. Thus, poll until metrics created from
// stats.End show up.
var err error
if gotMetrics, err = waitForServerCompletedRPCs(ctx, t, mr, metric); err != nil { // move to shared helper
t.Fatal(err)
}
continue
}
// If one of the duration metrics, ignore the bucket counts, and make
// sure it count falls within a bucket <= 5 seconds (maximum duration of
// test due to context).
val, ok := gotMetrics[metric.Name]
if !ok {
t.Fatalf("Metric %v not present in recorded metrics", metric.Name)
}
if metric.Name == "grpc.client.attempt.duration" || metric.Name == "grpc.client.call.duration" || metric.Name == "grpc.server.call.duration" {
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) {
t.Fatalf("Metrics data type not equal for metric: %v", metric.Name)
}
if err := checkDataPointWithinFiveSeconds(val); err != nil {
t.Fatalf("Data point not within five seconds for metric %v: %v", metric.Name, err)
}
continue
}
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
t.Fatalf("Metrics data type not equal for metric: %v", metric.Name)
}
}
}

View File

@ -120,12 +120,6 @@ type MetricsOptions struct {
// will be recorded.
Metrics *Metrics
// TargetAttributeFilter is a callback that takes the target string of the
// channel and returns a bool representing whether to use target as a label
// value or use the string "other". If unset, will use the target string as
// is. This only applies for client side metrics.
TargetAttributeFilter func(string) bool
// MethodAttributeFilter is to record the method name of RPCs handled by
// grpc.UnknownServiceHandler, but take care to limit the values allowed, as
// allowing too many will increase cardinality and could cause severe memory

View File

@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.65.0-dev"
const Version = "1.65.0"

View File

@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
@ -85,6 +86,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
},
@ -94,6 +96,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName + "-new",
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
},
@ -108,6 +111,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: "dns_host:8080",
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
},
@ -117,6 +121,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: "dns_host_new:8080",
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
},
@ -211,12 +216,14 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
},
{
Cluster: dnsClusterName,
Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: fmt.Sprintf("%s:%d", dnsHostName, dnsPort),
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
},
},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
@ -247,12 +254,14 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
},
{
Cluster: dnsClusterNameNew,
Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: fmt.Sprintf("%s:%d", dnsHostNameNew, dnsPort),
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
},
},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
@ -298,12 +307,14 @@ func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
},
{
Cluster: dnsClusterName,
Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: fmt.Sprintf("%s:%d", dnsHostName, dnsPort),
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
},
},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
@ -329,6 +340,7 @@ func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
}
@ -363,6 +375,7 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
}
@ -391,12 +404,14 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
},
{
Cluster: dnsClusterName,
Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: fmt.Sprintf("%s:%d", dnsHostName, dnsPort),
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
},
},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
@ -420,6 +435,7 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
}
@ -572,6 +588,7 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
}
@ -639,12 +656,14 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
},
{
Cluster: clusterNameD,
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
},
},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
@ -727,6 +746,7 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
}
@ -832,6 +852,7 @@ func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: internal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
}

View File

@ -42,6 +42,7 @@ import (
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
@ -456,6 +457,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) {
EDSServiceName: serviceName,
MaxConcurrentRequests: newUint32(512),
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: xdsinternal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
},
@ -483,6 +485,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: xdsinternal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"ring_hash_experimental": {"minRingSize":100, "maxRingSize":1000}}]`),
},
@ -505,6 +508,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) {
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{"successRateEjection":{}}`),
TelemetryLabels: xdsinternal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"ring_hash_experimental": {"minRingSize":1024, "maxRingSize":8388608}}]`),
},
@ -557,6 +561,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) {
"requestVolume": 50
}
}`),
TelemetryLabels: xdsinternal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"ring_hash_experimental": {"minRingSize":1024, "maxRingSize":8388608}}]`),
},
@ -607,6 +612,7 @@ func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
Creds: bootstrap.ChannelCreds{Type: "insecure"},
},
OutlierDetection: json.RawMessage(`{}`),
TelemetryLabels: xdsinternal.UnknownCSMLabels,
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
}

View File

@ -41,6 +41,7 @@ import (
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/balancer/priority"
@ -400,8 +401,9 @@ func (s) TestOutlierDetectionConfigPropagationToChildPolicy(t *testing.T) {
ChildPolicy: &iserviceconfig.BalancerConfig{
Name: clusterimpl.Name,
Config: &clusterimpl.LBConfig{
Cluster: clusterName,
EDSServiceName: edsServiceName,
Cluster: clusterName,
EDSServiceName: edsServiceName,
TelemetryLabels: xdsinternal.UnknownCSMLabels,
ChildPolicy: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
Config: &wrrlocality.LBConfig{

View File

@ -83,3 +83,10 @@ func SetLocalityID(addr resolver.Address, l LocalityID) resolver.Address {
// ResourceTypeMapForTesting maps TypeUrl to corresponding ResourceType.
var ResourceTypeMapForTesting map[string]any
// UnknownCSMLabels are TelemetryLabels emitted from CDS if CSM Telemetry Label
// data is not present in the CDS Resource.
var UnknownCSMLabels = map[string]string{
"csm.service_name": "unknown",
"csm.service_namespace_name": "unknown",
}

View File

@ -103,7 +103,7 @@ func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantU
return fmt.Errorf("received update with error type %v, want %v", gotType, wantType)
}
}
cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy")}
cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy", "TelemetryLabels")}
if diff := cmp.Diff(wantUpdate.update, got.update, cmpOpts...); diff != "" {
return fmt.Errorf("received unexpected diff in the cluster resource update: (-want, got):\n%s", diff)
}

View File

@ -871,7 +871,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
}
cmpOpts := []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy"),
cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy", "TelemetryLabels"),
}
if diff := cmp.Diff(test.wantUpdate, gotUpdate, cmpOpts...); diff != "" {
t.Fatalf("Unexpected diff in metadata, diff (-want +got):\n%s", diff)

View File

@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
@ -137,9 +138,10 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
ClusterType: xdsresource.ClusterTypeLogicalDNS,
DNSHostName: "dns_host:8080",
ClusterName: clusterName,
ClusterType: xdsresource.ClusterTypeLogicalDNS,
DNSHostName: "dns_host:8080",
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
@ -168,6 +170,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
ClusterName: clusterName,
ClusterType: xdsresource.ClusterTypeAggregate,
PrioritizedClusterNames: []string{"a", "b", "c"},
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
@ -179,9 +182,12 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
{
name: "happy-case-no-service-name-no-lrs",
cluster: e2e.DefaultCluster(clusterName, "", e2e.SecurityLevelNone),
wantUpdate: xdsresource.ClusterUpdate{ClusterName: clusterName},
name: "happy-case-no-service-name-no-lrs",
cluster: e2e.DefaultCluster(clusterName, "", e2e.SecurityLevelNone),
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
Config: &wrrlocality.LBConfig{
@ -195,8 +201,9 @@ func (s) TestValidateCluster_Success(t *testing.T) {
name: "happy-case-no-lrs",
cluster: e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone),
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
@ -219,6 +226,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
ClusterName: clusterName,
EDSServiceName: serviceName,
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
@ -257,6 +265,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
EDSServiceName: serviceName,
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
MaxRequests: func() *uint32 { i := uint32(512); return &i }(),
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
@ -275,8 +284,9 @@ func (s) TestValidateCluster_Success(t *testing.T) {
return c
}(),
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "ring_hash_experimental",
@ -302,8 +312,9 @@ func (s) TestValidateCluster_Success(t *testing.T) {
LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
@ -326,8 +337,9 @@ func (s) TestValidateCluster_Success(t *testing.T) {
return c
}(),
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "ring_hash_experimental",
@ -358,8 +370,9 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
@ -396,8 +409,9 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "ring_hash_experimental",
@ -431,8 +445,9 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
@ -470,8 +485,9 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
@ -518,8 +534,9 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
TelemetryLabels: internal.UnknownCSMLabels,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "ring_hash_experimental",

View File

@ -100,6 +100,15 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster, serv
}
}
}
// "The values for the service labels csm.service_name and
// csm.service_namespace_name come from xDS, “unknown” if not present." -
// CSM Design.
if _, ok := telemetryLabels["csm.service_name"]; !ok {
telemetryLabels["csm.service_name"] = "unknown"
}
if _, ok := telemetryLabels["csm.service_namespace_name"]; !ok {
telemetryLabels["csm.service_namespace_name"] = "unknown"
}
var lbPolicy json.RawMessage
var err error

View File

@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/matcher"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
@ -873,6 +874,7 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
RootInstanceName: rootPluginInstance,
RootCertName: rootCertName,
},
TelemetryLabels: internal.UnknownCSMLabels,
},
},
{
@ -914,6 +916,7 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
RootInstanceName: rootPluginInstance,
RootCertName: rootCertName,
},
TelemetryLabels: internal.UnknownCSMLabels,
},
},
{
@ -959,6 +962,7 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
IdentityInstanceName: identityPluginInstance,
IdentityCertName: identityCertName,
},
TelemetryLabels: internal.UnknownCSMLabels,
},
},
{
@ -1006,6 +1010,7 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
IdentityInstanceName: identityPluginInstance,
IdentityCertName: identityCertName,
},
TelemetryLabels: internal.UnknownCSMLabels,
},
},
{
@ -1072,6 +1077,7 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
matcher.StringMatcherForTesting(nil, nil, nil, newStringP(sanContains), nil, false),
},
},
TelemetryLabels: internal.UnknownCSMLabels,
},
},
{
@ -1138,6 +1144,7 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
matcher.StringMatcherForTesting(nil, nil, nil, newStringP(sanContains), nil, false),
},
},
TelemetryLabels: internal.UnknownCSMLabels,
},
},
}
@ -1326,6 +1333,7 @@ func (s) TestUnmarshalCluster(t *testing.T) {
EDSServiceName: v3Service,
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
Raw: v3ClusterAny,
TelemetryLabels: internal.UnknownCSMLabels,
},
},
{
@ -1338,6 +1346,7 @@ func (s) TestUnmarshalCluster(t *testing.T) {
EDSServiceName: v3Service,
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
Raw: v3ClusterAny,
TelemetryLabels: internal.UnknownCSMLabels,
},
},
{
@ -1350,6 +1359,7 @@ func (s) TestUnmarshalCluster(t *testing.T) {
EDSServiceName: v3Service,
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
Raw: v3ClusterAnyWithEDSConfigSourceSelf,
TelemetryLabels: internal.UnknownCSMLabels,
},
},
{
@ -1379,7 +1389,8 @@ func (s) TestUnmarshalCluster(t *testing.T) {
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
Raw: v3ClusterAnyWithTelemetryLabelsIgnoreSome,
TelemetryLabels: map[string]string{
"csm.service_name": "grpc-service",
"csm.service_name": "grpc-service",
"csm.service_namespace_name": "unknown",
},
},
},