From f69e9ad8d487ea92fad1f5c854a45541bd62dcc5 Mon Sep 17 00:00:00 2001 From: Zach Reyes <39203661+zasweq@users.noreply.github.com> Date: Mon, 6 Feb 2023 20:00:14 -0500 Subject: [PATCH] stats/opencensus: Add OpenCensus metrics support (#5923) --- codes/code_string.go | 51 +- internal/internal.go | 3 + stats/opencensus/client_metrics.go | 116 ++++ stats/opencensus/e2e_test.go | 940 +++++++++++++++++++++++++++++ stats/opencensus/go.mod | 1 + stats/opencensus/opencensus.go | 10 +- stats/opencensus/server_metrics.go | 115 ++++ stats/opencensus/stats.go | 215 +++++++ 8 files changed, 1448 insertions(+), 3 deletions(-) create mode 100644 stats/opencensus/client_metrics.go create mode 100644 stats/opencensus/e2e_test.go create mode 100644 stats/opencensus/server_metrics.go create mode 100644 stats/opencensus/stats.go diff --git a/codes/code_string.go b/codes/code_string.go index 0b206a578..934fac2b0 100644 --- a/codes/code_string.go +++ b/codes/code_string.go @@ -18,7 +18,15 @@ package codes -import "strconv" +import ( + "strconv" + + "google.golang.org/grpc/internal" +) + +func init() { + internal.CanonicalString = canonicalString +} func (c Code) String() string { switch c { @@ -60,3 +68,44 @@ func (c Code) String() string { return "Code(" + strconv.FormatInt(int64(c), 10) + ")" } } + +func canonicalString(c Code) string { + switch c { + case OK: + return "OK" + case Canceled: + return "CANCELLED" + case Unknown: + return "UNKNOWN" + case InvalidArgument: + return "INVALID_ARGUMENT" + case DeadlineExceeded: + return "DEADLINE_EXCEEDED" + case NotFound: + return "NOT_FOUND" + case AlreadyExists: + return "ALREADY_EXISTS" + case PermissionDenied: + return "PERMISSION_DENIED" + case ResourceExhausted: + return "RESOURCE_EXHAUSTED" + case FailedPrecondition: + return "FAILED_PRECONDITION" + case Aborted: + return "ABORTED" + case OutOfRange: + return "OUT_OF_RANGE" + case Unimplemented: + return "UNIMPLEMENTED" + case Internal: + return "INTERNAL" + case Unavailable: + return "UNAVAILABLE" + case DataLoss: + return "DATA_LOSS" + case Unauthenticated: + return "UNAUTHENTICATED" + default: + return "CODE(" + strconv.FormatInt(int64(c), 10) + ")" + } +} diff --git a/internal/internal.go b/internal/internal.go index cb5139a19..dc9fcd110 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -61,6 +61,9 @@ var ( // gRPC server. An xDS-enabled server needs to know what type of credentials // is configured on the underlying gRPC server. This is set by server.go. GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials + // CanonicalString returns the canonical string of the code defined here: + // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md. + CanonicalString interface{} // func (codes.Code) string // DrainServerTransports initiates a graceful close of existing connections // on a gRPC server accepted on the provided listener address. An // xDS-enabled server invokes this method on a grpc.Server when a particular diff --git a/stats/opencensus/client_metrics.go b/stats/opencensus/client_metrics.go new file mode 100644 index 000000000..3ea5e1c2c --- /dev/null +++ b/stats/opencensus/client_metrics.go @@ -0,0 +1,116 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opencensus + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + keyClientMethod = tag.MustNewKey("grpc_client_method") + keyClientStatus = tag.MustNewKey("grpc_client_status") +) + +// Measures, which are recorded by client stats handler: Note that due to the +// nature of how stats handlers are called on gRPC's client side, the per rpc +// unit is actually per attempt throughout this definition file. +var ( + clientSentMessagesPerRPC = stats.Int64("grpc.io/client/sent_messages_per_rpc", "Number of messages sent in the RPC (always 1 for non-streaming RPCs).", stats.UnitDimensionless) + clientSentBytesPerRPC = stats.Int64("grpc.io/client/sent_bytes_per_rpc", "Total bytes sent across all request messages per RPC.", stats.UnitBytes) + clientReceivedMessagesPerRPC = stats.Int64("grpc.io/client/received_messages_per_rpc", "Number of response messages received per RPC (always 1 for non-streaming RPCs).", stats.UnitDimensionless) + clientReceivedBytesPerRPC = stats.Int64("grpc.io/client/received_bytes_per_rpc", "Total bytes received across all response messages per RPC.", stats.UnitBytes) + clientRoundtripLatency = stats.Float64("grpc.io/client/roundtrip_latency", "Time between first byte of request sent to last byte of response received, or terminal error.", stats.UnitMilliseconds) + clientStartedRPCs = stats.Int64("grpc.io/client/started_rpcs", "The total number of client RPCs ever opened, including those that have not completed.", stats.UnitDimensionless) + clientServerLatency = stats.Float64("grpc.io/client/server_latency", `Propagated from the server and should have the same value as "grpc.io/server/latency".`, stats.UnitMilliseconds) +) + +var ( + // ClientSentMessagesPerRPCView is the distribution of sent messages per + // RPC, keyed on method. + ClientSentMessagesPerRPCView = &view.View{ + Measure: clientSentMessagesPerRPC, + Name: "grpc.io/client/sent_messages_per_rpc", + Description: "Distribution of sent messages per RPC, by method.", + TagKeys: []tag.Key{keyClientMethod}, + Aggregation: countDistribution, + } + // ClientReceivedMessagesPerRPCView is the distribution of received messages + // per RPC, keyed on method. + ClientReceivedMessagesPerRPCView = &view.View{ + Measure: clientReceivedMessagesPerRPC, + Name: "grpc.io/client/received_messages_per_rpc", + Description: "Distribution of received messages per RPC, by method.", + TagKeys: []tag.Key{keyClientMethod}, + Aggregation: countDistribution, + } + // ClientSentBytesPerRPCView is the distribution of sent bytes per RPC, + // keyed on method. + ClientSentBytesPerRPCView = &view.View{ + Measure: clientSentBytesPerRPC, + Name: "grpc.io/client/sent_bytes_per_rpc", + Description: "Distribution of sent bytes per RPC, by method.", + TagKeys: []tag.Key{keyClientMethod}, + Aggregation: bytesDistribution, + } + // ClientReceivedBytesPerRPCView is the distribution of received bytes per + // RPC, keyed on method. + ClientReceivedBytesPerRPCView = &view.View{ + Measure: clientReceivedBytesPerRPC, + Name: "grpc.io/client/received_bytes_per_rpc", + Description: "Distribution of received bytes per RPC, by method.", + TagKeys: []tag.Key{keyClientMethod}, + Aggregation: bytesDistribution, + } + // ClientStartedRPCsView is the count of opened RPCs, keyed on method. + ClientStartedRPCsView = &view.View{ + Measure: clientStartedRPCs, + Name: "grpc.io/client/started_rpcs", + Description: "Number of opened client RPCs, by method.", + TagKeys: []tag.Key{keyClientMethod}, + Aggregation: view.Count(), + } + // ClientCompletedRPCsView is the count of completed RPCs, keyed on method + // and status. + ClientCompletedRPCsView = &view.View{ + Measure: clientRoundtripLatency, + Name: "grpc.io/client/completed_rpcs", + Description: "Number of completed RPCs by method and status.", + TagKeys: []tag.Key{keyClientMethod, keyClientStatus}, + Aggregation: view.Count(), + } + // ClientRoundtripLatencyView is the distribution of round-trip latency in + // milliseconds per RPC, keyed on method. + ClientRoundtripLatencyView = &view.View{ + Measure: clientRoundtripLatency, + Name: "grpc.io/client/roundtrip_latency", + Description: "Distribution of round-trip latency, by method.", + TagKeys: []tag.Key{keyClientMethod}, + Aggregation: millisecondsDistribution, + } +) + +// DefaultClientViews is the set of client views which are considered the +// minimum required to monitor client side performance. +var DefaultClientViews = []*view.View{ + ClientSentBytesPerRPCView, + ClientReceivedBytesPerRPCView, + ClientRoundtripLatencyView, + ClientCompletedRPCsView, + ClientStartedRPCsView, +} diff --git a/stats/opencensus/e2e_test.go b/stats/opencensus/e2e_test.go new file mode 100644 index 000000000..d302ec94d --- /dev/null +++ b/stats/opencensus/e2e_test.go @@ -0,0 +1,940 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opencensus + +import ( + "context" + "errors" + "fmt" + "io" + "reflect" + "sort" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + + "google.golang.org/grpc" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/leakcheck" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/test/grpc_testing" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func init() { + // OpenCensus, once included in binary, will spawn a global goroutine + // recorder that is not controllable by application. + // https://github.com/census-instrumentation/opencensus-go/issues/1191 + leakcheck.RegisterIgnoreGoroutine("go.opencensus.io/stats/view.(*worker).start") +} + +var defaultTestTimeout = 5 * time.Second + +type fakeExporter struct { + t *testing.T + + mu sync.RWMutex + seenViews map[string]*viewInformation +} + +// viewInformation is information Exported from the view package through +// ExportView relevant to testing, i.e. a reasonably non flaky expectation of +// desired emissions to Exporter. +type viewInformation struct { + aggType view.AggType + aggBuckets []float64 + desc string + tagKeys []tag.Key + rows []*view.Row +} + +func (fe *fakeExporter) ExportView(vd *view.Data) { + fe.mu.Lock() + defer fe.mu.Unlock() + fe.seenViews[vd.View.Name] = &viewInformation{ + aggType: vd.View.Aggregation.Type, + aggBuckets: vd.View.Aggregation.Buckets, + desc: vd.View.Description, + tagKeys: vd.View.TagKeys, + rows: vd.Rows, + } +} + +// compareRows compares rows with respect to the information desired to test. +// Both the tags representing the rows and also the data of the row are tested +// for equality. Rows are in nondeterministic order when ExportView is called, +// but handled inside this function by sorting. +func compareRows(rows []*view.Row, rows2 []*view.Row) bool { + if len(rows) != len(rows2) { + return false + } + // Sort both rows according to the same rule. This is to take away non + // determinism in the row ordering passed to the Exporter, while keeping the + // row data. + sort.Slice(rows, func(i, j int) bool { + return rows[i].String() > rows[j].String() + }) + + sort.Slice(rows2, func(i, j int) bool { + return rows2[i].String() > rows2[j].String() + }) + + for i, row := range rows { + if !cmp.Equal(row.Tags, rows2[i].Tags, cmp.Comparer(func(a tag.Key, b tag.Key) bool { + return a.Name() == b.Name() + })) { + return false + } + if !compareData(row.Data, rows2[i].Data) { + return false + } + } + return true +} + +// compareData returns whether the two aggregation data's are equal to each +// other with respect to parts of the data desired for correct emission. The +// function first makes sure the two types of aggregation data are the same, and +// then checks the equality for the respective aggregation data type. +func compareData(ad view.AggregationData, ad2 view.AggregationData) bool { + if ad == nil && ad2 == nil { + return true + } + if ad == nil || ad2 == nil { + return false + } + if reflect.TypeOf(ad) != reflect.TypeOf(ad2) { + return false + } + switch ad1 := ad.(type) { + case *view.DistributionData: + dd2 := ad2.(*view.DistributionData) + // Count and Count Per Buckets are reasonable for correctness, + // especially since we verify equality of bucket endpoints elsewhere. + if ad1.Count != dd2.Count { + return false + } + for i, count := range ad1.CountPerBucket { + if count != dd2.CountPerBucket[i] { + return false + } + } + case *view.CountData: + cd2 := ad2.(*view.CountData) + return ad1.Value == cd2.Value + + // gRPC open census plugin does not have these next two types of aggregation + // data types present, for now just check for type equality between the two + // aggregation data points (done above). + // case *view.SumData + // case *view.LastValueData: + } + return true +} + +func (vi *viewInformation) Equal(vi2 *viewInformation) bool { + if vi == nil && vi2 == nil { + return true + } + if vi == nil || vi2 == nil { + return false + } + if vi.aggType != vi2.aggType { + return false + } + if !cmp.Equal(vi.aggBuckets, vi2.aggBuckets) { + return false + } + if vi.desc != vi2.desc { + return false + } + if !cmp.Equal(vi.tagKeys, vi2.tagKeys, cmp.Comparer(func(a tag.Key, b tag.Key) bool { + return a.Name() == b.Name() + })) { + return false + } + if !compareRows(vi.rows, vi2.rows) { + return false + } + return true +} + +// distributionDataLatencyCount checks if the view information contains the +// desired distrubtion latency total count that falls in buckets of 5 seconds or +// less. This must be called with non nil view information that is aggregated +// with distribution data. Returns a nil error if correct count information +// found, non nil error if correct information not found. +func distributionDataLatencyCount(vi *viewInformation, countWant int64) error { + var totalCount int64 + var largestIndexWithFive int + for i, bucket := range vi.aggBuckets { + // Distribution for latency is measured in milliseconds, so 5 * 1000 = + // 5000. + if bucket > 5000 { + largestIndexWithFive = i + break + } + } + // Iterating through rows sums up data points for all methods. In this case, + // a data point for the unary and for the streaming RPC. + for _, row := range vi.rows { + // This could potentially have an extra measurement in buckets above 5s, + // but that's fine. Count of buckets that could contain up to 5s is a + // good enough assertion. + for i, count := range row.Data.(*view.DistributionData).CountPerBucket { + if i >= largestIndexWithFive { + break + } + totalCount = totalCount + count + } + } + if totalCount != countWant { + return fmt.Errorf("wrong total count for counts under 5: %v, wantCount: %v", totalCount, countWant) + } + return nil +} + +// TestAllMetricsOneFunction tests emitted metrics from gRPC. It registers all +// the metrics provided by this package. It then configures a system with a gRPC +// Client and gRPC server with the OpenCensus Dial and Server Option configured, +// and makes a Unary RPC and a Streaming RPC. These two RPCs should cause +// certain emissions for each registered metric through the OpenCensus View +// package. +func (s) TestAllMetricsOneFunction(t *testing.T) { + allViews := []*view.View{ + ClientStartedRPCsView, + ServerStartedRPCsView, + ClientCompletedRPCsView, + ServerCompletedRPCsView, + ClientSentBytesPerRPCView, + ServerSentBytesPerRPCView, + ClientReceivedBytesPerRPCView, + ServerReceivedBytesPerRPCView, + ClientSentMessagesPerRPCView, + ServerSentMessagesPerRPCView, + ClientReceivedMessagesPerRPCView, + ServerReceivedMessagesPerRPCView, + ClientRoundtripLatencyView, + ServerLatencyView, + } + view.Register(allViews...) + // Unregister unconditionally in this defer to correctly cleanup globals in + // error conditions. + defer view.Unregister(allViews...) + fe := &fakeExporter{ + t: t, + seenViews: make(map[string]*viewInformation), + } + view.RegisterExporter(fe) + defer view.UnregisterExporter(fe) + + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, in *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + return &grpc_testing.SimpleResponse{}, nil + }, + FullDuplexCallF: func(stream grpc_testing.TestService_FullDuplexCallServer) error { + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + } + }, + } + if err := ss.Start([]grpc.ServerOption{ServerOption(TraceOptions{})}, DialOption(TraceOptions{})); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Make two RPC's, a unary RPC and a streaming RPC. These should cause + // certain metrics to be emitted. + if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{}}); err != nil { + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + + cmtk := tag.MustNewKey("grpc_client_method") + smtk := tag.MustNewKey("grpc_server_method") + cstk := tag.MustNewKey("grpc_client_status") + sstk := tag.MustNewKey("grpc_server_status") + wantMetrics := []struct { + metric *view.View + wantVI *viewInformation + }{ + { + metric: ClientStartedRPCsView, + wantVI: &viewInformation{ + aggType: view.AggTypeCount, + aggBuckets: []float64{}, + desc: "Number of opened client RPCs, by method.", + tagKeys: []tag.Key{ + cmtk, + }, + + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + }, + Data: &view.CountData{ + Value: 1, + }, + }, + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + }, + Data: &view.CountData{ + Value: 1, + }, + }, + }, + }, + }, + { + metric: ServerStartedRPCsView, + wantVI: &viewInformation{ + aggType: view.AggTypeCount, + aggBuckets: []float64{}, + desc: "Number of opened server RPCs, by method.", + tagKeys: []tag.Key{ + smtk, + }, + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + }, + Data: &view.CountData{ + Value: 1, + }, + }, + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + }, + Data: &view.CountData{ + Value: 1, + }, + }, + }, + }, + }, + { + metric: ClientCompletedRPCsView, + wantVI: &viewInformation{ + aggType: view.AggTypeCount, + aggBuckets: []float64{}, + desc: "Number of completed RPCs by method and status.", + tagKeys: []tag.Key{ + cmtk, + cstk, + }, + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + { + Key: cstk, + Value: "OK", + }, + }, + Data: &view.CountData{ + Value: 1, + }, + }, + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + { + Key: cstk, + Value: "OK", + }, + }, + Data: &view.CountData{ + Value: 1, + }, + }, + }, + }, + }, + { + metric: ServerCompletedRPCsView, + wantVI: &viewInformation{ + aggType: view.AggTypeCount, + aggBuckets: []float64{}, + desc: "Number of completed RPCs by method and status.", + tagKeys: []tag.Key{ + smtk, + sstk, + }, + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + { + Key: sstk, + Value: "OK", + }, + }, + Data: &view.CountData{ + Value: 1, + }, + }, + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + { + Key: sstk, + Value: "OK", + }, + }, + Data: &view.CountData{ + Value: 1, + }, + }, + }, + }, + }, + { + metric: ClientSentBytesPerRPCView, + wantVI: &viewInformation{ + aggType: view.AggTypeDistribution, + aggBuckets: bytesDistributionBounds, + desc: "Distribution of sent bytes per RPC, by method.", + tagKeys: []tag.Key{ + cmtk, + }, + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + }, + }, + }, + { + metric: ServerSentBytesPerRPCView, + wantVI: &viewInformation{ + aggType: view.AggTypeDistribution, + aggBuckets: bytesDistributionBounds, + desc: "Distribution of sent bytes per RPC, by method.", + tagKeys: []tag.Key{ + smtk, + }, + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + }, + }, + }, + + { + metric: ClientReceivedBytesPerRPCView, + wantVI: &viewInformation{ + aggType: view.AggTypeDistribution, + aggBuckets: bytesDistributionBounds, + desc: "Distribution of received bytes per RPC, by method.", + tagKeys: []tag.Key{ + cmtk, + }, + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + }, + }, + }, + { + metric: ServerReceivedBytesPerRPCView, + wantVI: &viewInformation{ + aggType: view.AggTypeDistribution, + aggBuckets: bytesDistributionBounds, + desc: "Distribution of received bytes per RPC, by method.", + tagKeys: []tag.Key{ + smtk, + }, + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + }, + }, + }, + { + metric: ClientSentMessagesPerRPCView, + wantVI: &viewInformation{ + aggType: view.AggTypeDistribution, + aggBuckets: countDistributionBounds, + desc: "Distribution of sent messages per RPC, by method.", + tagKeys: []tag.Key{ + cmtk, + }, + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + }, + }, + }, + { + metric: ServerSentMessagesPerRPCView, + wantVI: &viewInformation{ + aggType: view.AggTypeDistribution, + aggBuckets: countDistributionBounds, + desc: "Distribution of sent messages per RPC, by method.", + tagKeys: []tag.Key{ + smtk, + }, + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + }, + }, + }, + { + metric: ClientReceivedMessagesPerRPCView, + wantVI: &viewInformation{ + aggType: view.AggTypeDistribution, + aggBuckets: countDistributionBounds, + desc: "Distribution of received messages per RPC, by method.", + tagKeys: []tag.Key{ + cmtk, + }, + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + { + Tags: []tag.Tag{ + { + Key: cmtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + }, + }, + }, + { + metric: ServerReceivedMessagesPerRPCView, + wantVI: &viewInformation{ + aggType: view.AggTypeDistribution, + aggBuckets: countDistributionBounds, + desc: "Distribution of received messages per RPC, by method.", + tagKeys: []tag.Key{ + smtk, + }, + rows: []*view.Row{ + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/UnaryCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + { + Tags: []tag.Tag{ + { + Key: smtk, + Value: "grpc.testing.TestService/FullDuplexCall", + }, + }, + Data: &view.DistributionData{ + Count: 1, + CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + }, + }, + }, + { + metric: ClientRoundtripLatencyView, + }, + { + metric: ServerLatencyView, + }, + } + // Unregister all the views. Unregistering a view causes a synchronous + // upload of any collected data for the view to any registered exporters. + // Thus, after this unregister call, the exporter has the data to make + // assertions on immediately. + view.Unregister(allViews...) + // Assert the expected emissions for each metric match the expected + // emissions. + for _, wantMetric := range wantMetrics { + metricName := wantMetric.metric.Name + var vi *viewInformation + if vi = fe.seenViews[metricName]; vi == nil { + t.Fatalf("couldn't find %v in the views exported, never collected", metricName) + } + + // For latency metrics, there is a lot of non determinism about + // the exact milliseconds of RPCs that finish. Thus, rather than + // declare the exact data you want, make sure the latency + // measurement points for the two RPCs above fall within buckets + // that fall into less than 5 seconds, which is the rpc timeout. + if metricName == "grpc.io/client/roundtrip_latency" || metricName == "grpc.io/server/server_latency" { + // RPCs have a context timeout of 5s, so all the recorded + // measurements (one per RPC - two total) should fall within 5 + // second buckets. + if err := distributionDataLatencyCount(vi, 2); err != nil { + t.Fatalf("Invalid OpenCensus export view data for metric %v: %v", metricName, err) + } + continue + } + if diff := cmp.Diff(vi, wantMetric.wantVI); diff != "" { + t.Fatalf("got unexpected viewInformation for metric %v, diff (-got, +want): %v", metricName, diff) + } + // Note that this test only fatals with one error if a metric fails. + // This is fine, as all are expected to pass so if a single one fails + // you can figure it out and iterate as needed. + } +} + +// TestOpenCensusTags tests this instrumentation code's ability to propagate +// OpenCensus tags across the wire. It also tests the server stats handler's +// functionality of adding the server method tag for the application to see. The +// test makes an Unary RPC without a tag map and with a tag map, and expects to +// see a tag map at the application layer with server method tag in the first +// case, and a tag map at the application layer with the populated tag map plus +// server method tag in second case. +func (s) TestOpenCensusTags(t *testing.T) { + // This stub servers functions represent the application layer server side. + // This is the intended feature being tested: that open census tags + // populated at the client side application layer end up at the server side + // application layer with the server method tag key in addition to the map + // populated at the client side application layer if populated. + tmCh := testutils.NewChannel() + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, in *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + // Do the sends of the tag maps for assertions in this main testing + // goroutine. Do the receives and assertions in a forked goroutine. + if tm := tag.FromContext(ctx); tm != nil { + tmCh.Send(tm) + } else { + tmCh.Send(errors.New("no tag map received server side")) + } + return &grpc_testing.SimpleResponse{}, nil + }, + } + if err := ss.Start([]grpc.ServerOption{ServerOption(TraceOptions{})}, DialOption(TraceOptions{})); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + key1 := tag.MustNewKey("key 1") + wg := sync.WaitGroup{} + wg.Add(1) + readerErrCh := testutils.NewChannel() + // Spawn a goroutine to receive and validation two tag maps received by the + // server application code. + go func() { + defer wg.Done() + unaryCallMethodName := "grpc.testing.TestService/UnaryCall" + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Attempt to receive the tag map from the first RPC. + if tm, err := tmCh.Receive(ctx); err == nil { + tagMap, ok := tm.(*tag.Map) + // Shouldn't happen, this test sends only *tag.Map type on channel. + if !ok { + readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", tm)) + } + // keyServerMethod should be present in this tag map received server + // side. + val, ok := tagMap.Value(keyServerMethod) + if !ok { + readerErrCh.Send(fmt.Errorf("no key: %v present in OpenCensus tag map", keyServerMethod.Name())) + } + if val != unaryCallMethodName { + readerErrCh.Send(fmt.Errorf("serverMethod receieved: %v, want server method: %v", val, unaryCallMethodName)) + } + } else { + readerErrCh.Send(fmt.Errorf("error while waiting for a tag map: %v", err)) + } + readerErrCh.Send(nil) + + // Attempt to receive the tag map from the second RPC. + if tm, err := tmCh.Receive(ctx); err == nil { + tagMap, ok := tm.(*tag.Map) + // Shouldn't happen, this test sends only *tag.Map type on channel. + if !ok { + readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", tm)) + } + // key1: "value1" populated in the tag map client side should make + // it's way to server. + val, ok := tagMap.Value(key1) + if !ok { + readerErrCh.Send(fmt.Errorf("no key: %v present in OpenCensus tag map", key1.Name())) + } + if val != "value1" { + readerErrCh.Send(fmt.Errorf("key %v received: %v, want server method: %v", key1.Name(), val, unaryCallMethodName)) + } + // keyServerMethod should be appended to tag map as well. + val, ok = tagMap.Value(keyServerMethod) + if !ok { + readerErrCh.Send(fmt.Errorf("no key: %v present in OpenCensus tag map", keyServerMethod.Name())) + } + if val != unaryCallMethodName { + readerErrCh.Send(fmt.Errorf("key: %v received: %v, want server method: %v", keyServerMethod.Name(), val, unaryCallMethodName)) + } + } else { + readerErrCh.Send(fmt.Errorf("error while waiting for second tag map: %v", err)) + } + readerErrCh.Send(nil) + }() + + // Make a unary RPC without populating an OpenCensus tag map. The server + // side should receive an OpenCensus tag map containing only the + // keyServerMethod. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{}}); err != nil { + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + + ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Should receive a nil error from the readerErrCh, meaning the reader + // goroutine successfully received a tag map with the keyServerMethod + // populated. + if chErr, err := readerErrCh.Receive(ctx); chErr != nil || err != nil { + if err != nil { + t.Fatalf("Should have received something from error channel: %v", err) + } + if chErr != nil { + t.Fatalf("Should have received a nil error from channel, instead received: %v", chErr) + } + } + + tm := &tag.Map{} + ctx = tag.NewContext(ctx, tm) + ctx, err := tag.New(ctx, tag.Upsert(key1, "value1")) + // Setup steps like this can fatal, so easier to do the RPC's and subsequent + // sends of the tag maps of the RPC's in main goroutine and have the + // corresponding receives and assertions in a forked goroutine. + if err != nil { + t.Fatalf("Error creating tag map: %v", err) + } + // Make a unary RPC with a populated OpenCensus tag map. The server side + // should receive an OpenCensus tag map containing this populated tag map + // with the keyServerMethod tag appended to it. + if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{}}); err != nil { + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + if chErr, err := readerErrCh.Receive(ctx); chErr != nil || err != nil { + if err != nil { + t.Fatalf("Should have received something from error channel: %v", err) + } + if chErr != nil { + t.Fatalf("Should have received a nil error from channel, instead received: %v", chErr) + } + } + + wg.Wait() +} diff --git a/stats/opencensus/go.mod b/stats/opencensus/go.mod index 851683220..9f28368d9 100644 --- a/stats/opencensus/go.mod +++ b/stats/opencensus/go.mod @@ -3,6 +3,7 @@ module google.golang.org/grpc/stats/opencensus go 1.17 require ( + github.com/google/go-cmp v0.5.9 go.opencensus.io v0.24.0 google.golang.org/grpc v1.52.0 ) diff --git a/stats/opencensus/opencensus.go b/stats/opencensus/opencensus.go index ea9368ebb..fe5cdf08c 100644 --- a/stats/opencensus/opencensus.go +++ b/stats/opencensus/opencensus.go @@ -103,10 +103,13 @@ func (csh *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {} // TagRPC implements per RPC attempt context management. func (csh *clientStatsHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context { + ctx = csh.statsTagRPC(ctx, rti) return ctx } -func (csh *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {} +func (csh *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + recordRPCData(ctx, rs) +} type serverStatsHandler struct { to TraceOptions @@ -122,8 +125,11 @@ func (ssh *serverStatsHandler) HandleConn(context.Context, stats.ConnStats) {} // TagRPC implements per RPC context management. func (ssh *serverStatsHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context { + ctx = ssh.statsTagRPC(ctx, rti) return ctx } // HandleRPC implements per RPC tracing and stats implementation. -func (ssh *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {} +func (ssh *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + recordRPCData(ctx, rs) +} diff --git a/stats/opencensus/server_metrics.go b/stats/opencensus/server_metrics.go new file mode 100644 index 000000000..9268271c3 --- /dev/null +++ b/stats/opencensus/server_metrics.go @@ -0,0 +1,115 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opencensus + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + keyServerMethod = tag.MustNewKey("grpc_server_method") + keyServerStatus = tag.MustNewKey("grpc_server_status") +) + +// Measures, which are recorded by server stats handler: Note that on gRPC's +// server side, the per rpc unit is truly per rpc, as there is no concept of a +// rpc attempt server side. +var ( + serverReceivedMessagesPerRPC = stats.Int64("grpc.io/server/received_messages_per_rpc", "Number of messages received in each RPC. Has value 1 for non-streaming RPCs.", stats.UnitDimensionless) // the collection/measurement point of this measure handles the /rpc aspect of it + serverReceivedBytesPerRPC = stats.Int64("grpc.io/server/received_bytes_per_rpc", "Total bytes received across all messages per RPC.", stats.UnitBytes) + serverSentMessagesPerRPC = stats.Int64("grpc.io/server/sent_messages_per_rpc", "Number of messages sent in each RPC. Has value 1 for non-streaming RPCs.", stats.UnitDimensionless) + serverSentBytesPerRPC = stats.Int64("grpc.io/server/sent_bytes_per_rpc", "Total bytes sent in across all response messages per RPC.", stats.UnitBytes) + serverStartedRPCs = stats.Int64("grpc.io/server/started_rpcs", "The total number of server RPCs ever opened, including those that have not completed.", stats.UnitDimensionless) + serverLatency = stats.Float64("grpc.io/server/server_latency", "Time between first byte of request received to last byte of response sent, or terminal error.", stats.UnitMilliseconds) +) + +var ( + // ServerSentMessagesPerRPCView is the distribution of sent messages per + // RPC, keyed on method. + ServerSentMessagesPerRPCView = &view.View{ + Name: "grpc.io/server/sent_messages_per_rpc", + Description: "Distribution of sent messages per RPC, by method.", + TagKeys: []tag.Key{keyServerMethod}, + Measure: serverSentMessagesPerRPC, + Aggregation: countDistribution, + } + // ServerReceivedMessagesPerRPCView is the distribution of received messages + // per RPC, keyed on method. + ServerReceivedMessagesPerRPCView = &view.View{ + Name: "grpc.io/server/received_messages_per_rpc", + Description: "Distribution of received messages per RPC, by method.", + TagKeys: []tag.Key{keyServerMethod}, + Measure: serverReceivedMessagesPerRPC, + Aggregation: countDistribution, + } + // ServerSentBytesPerRPCView is the distribution of received bytes per RPC, + // keyed on method. + ServerSentBytesPerRPCView = &view.View{ + Name: "grpc.io/server/sent_bytes_per_rpc", + Description: "Distribution of sent bytes per RPC, by method.", + Measure: serverSentBytesPerRPC, + TagKeys: []tag.Key{keyServerMethod}, + Aggregation: bytesDistribution, + } + // ServerReceivedBytesPerRPCView is the distribution of sent bytes per RPC, + // keyed on method. + ServerReceivedBytesPerRPCView = &view.View{ + Name: "grpc.io/server/received_bytes_per_rpc", + Description: "Distribution of received bytes per RPC, by method.", + Measure: serverReceivedBytesPerRPC, + TagKeys: []tag.Key{keyServerMethod}, + Aggregation: bytesDistribution, + } + // ServerStartedRPCsView is the count of opened RPCs, keyed on method. + ServerStartedRPCsView = &view.View{ + Measure: serverStartedRPCs, + Name: "grpc.io/server/started_rpcs", + Description: "Number of opened server RPCs, by method.", + TagKeys: []tag.Key{keyServerMethod}, + Aggregation: view.Count(), + } + // ServerCompletedRPCsView is the count of completed RPCs, keyed on + // method and status. + ServerCompletedRPCsView = &view.View{ + Name: "grpc.io/server/completed_rpcs", + Description: "Number of completed RPCs by method and status.", + TagKeys: []tag.Key{keyServerMethod, keyServerStatus}, + Measure: serverLatency, + Aggregation: view.Count(), + } + // ServerLatencyView is the distribution of server latency in milliseconds + // per RPC, keyed on method. + ServerLatencyView = &view.View{ + Name: "grpc.io/server/server_latency", + Description: "Distribution of server latency in milliseconds, by method.", + TagKeys: []tag.Key{keyServerMethod}, + Measure: serverLatency, + Aggregation: millisecondsDistribution, + } +) + +// DefaultServerViews is the set of server views which are considered the +// minimum required to monitor server side performance. +var DefaultServerViews = []*view.View{ + ServerReceivedBytesPerRPCView, + ServerSentBytesPerRPCView, + ServerLatencyView, + ServerCompletedRPCsView, + ServerStartedRPCsView, +} diff --git a/stats/opencensus/stats.go b/stats/opencensus/stats.go new file mode 100644 index 000000000..35a929393 --- /dev/null +++ b/stats/opencensus/stats.go @@ -0,0 +1,215 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opencensus + +import ( + "context" + "strings" + "sync/atomic" + "time" + + ocstats "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" +) + +var logger = grpclog.Component("opencensus-instrumentation") + +var canonicalString = internal.CanonicalString.(func(codes.Code) string) + +type rpcDataKey struct{} + +func setRPCData(ctx context.Context, d *rpcData) context.Context { + return context.WithValue(ctx, rpcDataKey{}, d) +} + +var ( + // bounds separate variable for testing purposes. + bytesDistributionBounds = []float64{1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296} + bytesDistribution = view.Distribution(bytesDistributionBounds...) + millisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) + countDistributionBounds = []float64{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536} + countDistribution = view.Distribution(countDistributionBounds...) +) + +func removeLeadingSlash(mn string) string { + return strings.TrimLeft(mn, "/") +} + +// rpcData is data about the rpc attempt client side, and the overall rpc server +// side. +type rpcData struct { + // access these counts atomically for hedging in the future + // number of messages sent from side (client || server) + sentMsgs int64 + // number of bytes sent (within each message) from side (client || server) + sentBytes int64 + // number of messages received on side (client || server) + recvMsgs int64 + // number of bytes received (within each message) received on side (client + // || server) + recvBytes int64 + + startTime time.Time + method string +} + +// statsTagRPC creates a recording object to derive measurements from in the +// context, scoping the recordings to per RPC Attempt client side (scope of the +// context). It also populates the gRPC Metadata within the context with any +// opencensus specific tags set by the application in the context, binary +// encoded to send across the wire. +func (csh *clientStatsHandler) statsTagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + d := &rpcData{ + startTime: time.Now(), + method: info.FullMethodName, + } + // Populate gRPC Metadata with OpenCensus tag map if set by application. + if tm := tag.FromContext(ctx); tm != nil { + ctx = stats.SetTags(ctx, tag.Encode(tm)) + } + return setRPCData(ctx, d) +} + +// statsTagRPC creates a recording object to derive measurements from in the +// context, scoping the recordings to per RPC server side (scope of the +// context). It also deserializes the opencensus tags set in the context's gRPC +// Metadata, and adds a server method tag to the opencensus tags. +func (ssh *serverStatsHandler) statsTagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + d := &rpcData{ + startTime: time.Now(), + method: info.FullMethodName, + } + if tagsBin := stats.Tags(ctx); tagsBin != nil { + if tags, err := tag.Decode(tagsBin); err == nil { + ctx = tag.NewContext(ctx, tags) + } + } + // We can ignore the error here because in the error case, the context + // passed in is returned. If the call errors, the server side application + // layer won't get this key server method information in the tag map, but + // this instrumentation code will function as normal. + ctx, _ = tag.New(ctx, tag.Upsert(keyServerMethod, removeLeadingSlash(info.FullMethodName))) + return setRPCData(ctx, d) +} + +func recordRPCData(ctx context.Context, s stats.RPCStats) { + d, ok := ctx.Value(rpcDataKey{}).(*rpcData) + if !ok { + // Shouldn't happen, as gRPC calls TagRPC which populates the rpcData in + // context. + return + } + switch st := s.(type) { + case *stats.InHeader, *stats.OutHeader, *stats.InTrailer, *stats.OutTrailer: + // Headers and Trailers are not relevant to the measures, as the + // measures concern number of messages and bytes for messages. This + // aligns with flow control. + case *stats.Begin: + recordDataBegin(ctx, d, st) + case *stats.OutPayload: + recordDataOutPayload(d, st) + case *stats.InPayload: + recordDataInPayload(d, st) + case *stats.End: + recordDataEnd(ctx, d, st) + default: + // Shouldn't happen. gRPC calls into stats handler, and will never not + // be one of the types above. + logger.Errorf("Received unexpected stats type (%T) with data: %v", s, s) + } +} + +// recordDataBegin takes a measurement related to the RPC beginning, +// client/server started RPCs dependent on the caller. +func recordDataBegin(ctx context.Context, d *rpcData, b *stats.Begin) { + if b.Client { + ocstats.RecordWithOptions(ctx, + ocstats.WithTags(tag.Upsert(keyClientMethod, removeLeadingSlash(d.method))), + ocstats.WithMeasurements(clientStartedRPCs.M(1))) + return + } + ocstats.RecordWithOptions(ctx, + ocstats.WithTags(tag.Upsert(keyServerMethod, removeLeadingSlash(d.method))), + ocstats.WithMeasurements(serverStartedRPCs.M(1))) +} + +// recordDataOutPayload records the length in bytes of outgoing messages and +// increases total count of sent messages both stored in the RPCs (attempt on +// client side) context for use in taking measurements at RPC end. +func recordDataOutPayload(d *rpcData, op *stats.OutPayload) { + atomic.AddInt64(&d.sentMsgs, 1) + atomic.AddInt64(&d.sentBytes, int64(op.Length)) +} + +// recordDataInPayload records the length in bytes of incoming messages and +// increases total count of sent messages both stored in the RPCs (attempt on +// client side) context for use in taking measurements at RPC end. +func recordDataInPayload(d *rpcData, ip *stats.InPayload) { + atomic.AddInt64(&d.recvMsgs, 1) + atomic.AddInt64(&d.recvBytes, int64(ip.Length)) +} + +// recordDataEnd takes per RPC measurements derived from information derived +// from the lifetime of the RPC (RPC attempt client side). +func recordDataEnd(ctx context.Context, d *rpcData, e *stats.End) { + // latency bounds for distribution data (speced millisecond bounds) have + // fractions, thus need a float. + latency := float64(time.Since(d.startTime)) / float64(time.Millisecond) + var st string + if e.Error != nil { + s, _ := status.FromError(e.Error) + st = canonicalString(s.Code()) + } else { + st = "OK" + } + + // TODO: Attach trace data through attachments?!?! + + if e.Client { + ocstats.RecordWithOptions(ctx, + ocstats.WithTags( + tag.Upsert(keyClientMethod, removeLeadingSlash(d.method)), + tag.Upsert(keyClientStatus, st)), + ocstats.WithMeasurements( + clientSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)), + clientSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentMsgs)), + clientReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvMsgs)), + clientReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)), + clientRoundtripLatency.M(latency), + clientServerLatency.M(latency), + )) + return + } + ocstats.RecordWithOptions(ctx, + ocstats.WithTags( + tag.Upsert(keyServerMethod, removeLeadingSlash(d.method)), + tag.Upsert(keyServerStatus, st), + ), + ocstats.WithMeasurements( + serverSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)), + serverSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentMsgs)), + serverReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvMsgs)), + serverReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)), + serverLatency.M(latency))) +}