mirror of https://github.com/grpc/grpc-go.git
stats/opencensus: Add OpenCensus metrics support (#5923)
This commit is contained in:
parent
3151e834fa
commit
f69e9ad8d4
|
|
@ -18,7 +18,15 @@
|
|||
|
||||
package codes
|
||||
|
||||
import "strconv"
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"google.golang.org/grpc/internal"
|
||||
)
|
||||
|
||||
func init() {
|
||||
internal.CanonicalString = canonicalString
|
||||
}
|
||||
|
||||
func (c Code) String() string {
|
||||
switch c {
|
||||
|
|
@ -60,3 +68,44 @@ func (c Code) String() string {
|
|||
return "Code(" + strconv.FormatInt(int64(c), 10) + ")"
|
||||
}
|
||||
}
|
||||
|
||||
func canonicalString(c Code) string {
|
||||
switch c {
|
||||
case OK:
|
||||
return "OK"
|
||||
case Canceled:
|
||||
return "CANCELLED"
|
||||
case Unknown:
|
||||
return "UNKNOWN"
|
||||
case InvalidArgument:
|
||||
return "INVALID_ARGUMENT"
|
||||
case DeadlineExceeded:
|
||||
return "DEADLINE_EXCEEDED"
|
||||
case NotFound:
|
||||
return "NOT_FOUND"
|
||||
case AlreadyExists:
|
||||
return "ALREADY_EXISTS"
|
||||
case PermissionDenied:
|
||||
return "PERMISSION_DENIED"
|
||||
case ResourceExhausted:
|
||||
return "RESOURCE_EXHAUSTED"
|
||||
case FailedPrecondition:
|
||||
return "FAILED_PRECONDITION"
|
||||
case Aborted:
|
||||
return "ABORTED"
|
||||
case OutOfRange:
|
||||
return "OUT_OF_RANGE"
|
||||
case Unimplemented:
|
||||
return "UNIMPLEMENTED"
|
||||
case Internal:
|
||||
return "INTERNAL"
|
||||
case Unavailable:
|
||||
return "UNAVAILABLE"
|
||||
case DataLoss:
|
||||
return "DATA_LOSS"
|
||||
case Unauthenticated:
|
||||
return "UNAUTHENTICATED"
|
||||
default:
|
||||
return "CODE(" + strconv.FormatInt(int64(c), 10) + ")"
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,6 +61,9 @@ var (
|
|||
// gRPC server. An xDS-enabled server needs to know what type of credentials
|
||||
// is configured on the underlying gRPC server. This is set by server.go.
|
||||
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
|
||||
// CanonicalString returns the canonical string of the code defined here:
|
||||
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md.
|
||||
CanonicalString interface{} // func (codes.Code) string
|
||||
// DrainServerTransports initiates a graceful close of existing connections
|
||||
// on a gRPC server accepted on the provided listener address. An
|
||||
// xDS-enabled server invokes this method on a grpc.Server when a particular
|
||||
|
|
|
|||
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package opencensus
|
||||
|
||||
import (
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/tag"
|
||||
)
|
||||
|
||||
var (
|
||||
keyClientMethod = tag.MustNewKey("grpc_client_method")
|
||||
keyClientStatus = tag.MustNewKey("grpc_client_status")
|
||||
)
|
||||
|
||||
// Measures, which are recorded by client stats handler: Note that due to the
|
||||
// nature of how stats handlers are called on gRPC's client side, the per rpc
|
||||
// unit is actually per attempt throughout this definition file.
|
||||
var (
|
||||
clientSentMessagesPerRPC = stats.Int64("grpc.io/client/sent_messages_per_rpc", "Number of messages sent in the RPC (always 1 for non-streaming RPCs).", stats.UnitDimensionless)
|
||||
clientSentBytesPerRPC = stats.Int64("grpc.io/client/sent_bytes_per_rpc", "Total bytes sent across all request messages per RPC.", stats.UnitBytes)
|
||||
clientReceivedMessagesPerRPC = stats.Int64("grpc.io/client/received_messages_per_rpc", "Number of response messages received per RPC (always 1 for non-streaming RPCs).", stats.UnitDimensionless)
|
||||
clientReceivedBytesPerRPC = stats.Int64("grpc.io/client/received_bytes_per_rpc", "Total bytes received across all response messages per RPC.", stats.UnitBytes)
|
||||
clientRoundtripLatency = stats.Float64("grpc.io/client/roundtrip_latency", "Time between first byte of request sent to last byte of response received, or terminal error.", stats.UnitMilliseconds)
|
||||
clientStartedRPCs = stats.Int64("grpc.io/client/started_rpcs", "The total number of client RPCs ever opened, including those that have not completed.", stats.UnitDimensionless)
|
||||
clientServerLatency = stats.Float64("grpc.io/client/server_latency", `Propagated from the server and should have the same value as "grpc.io/server/latency".`, stats.UnitMilliseconds)
|
||||
)
|
||||
|
||||
var (
|
||||
// ClientSentMessagesPerRPCView is the distribution of sent messages per
|
||||
// RPC, keyed on method.
|
||||
ClientSentMessagesPerRPCView = &view.View{
|
||||
Measure: clientSentMessagesPerRPC,
|
||||
Name: "grpc.io/client/sent_messages_per_rpc",
|
||||
Description: "Distribution of sent messages per RPC, by method.",
|
||||
TagKeys: []tag.Key{keyClientMethod},
|
||||
Aggregation: countDistribution,
|
||||
}
|
||||
// ClientReceivedMessagesPerRPCView is the distribution of received messages
|
||||
// per RPC, keyed on method.
|
||||
ClientReceivedMessagesPerRPCView = &view.View{
|
||||
Measure: clientReceivedMessagesPerRPC,
|
||||
Name: "grpc.io/client/received_messages_per_rpc",
|
||||
Description: "Distribution of received messages per RPC, by method.",
|
||||
TagKeys: []tag.Key{keyClientMethod},
|
||||
Aggregation: countDistribution,
|
||||
}
|
||||
// ClientSentBytesPerRPCView is the distribution of sent bytes per RPC,
|
||||
// keyed on method.
|
||||
ClientSentBytesPerRPCView = &view.View{
|
||||
Measure: clientSentBytesPerRPC,
|
||||
Name: "grpc.io/client/sent_bytes_per_rpc",
|
||||
Description: "Distribution of sent bytes per RPC, by method.",
|
||||
TagKeys: []tag.Key{keyClientMethod},
|
||||
Aggregation: bytesDistribution,
|
||||
}
|
||||
// ClientReceivedBytesPerRPCView is the distribution of received bytes per
|
||||
// RPC, keyed on method.
|
||||
ClientReceivedBytesPerRPCView = &view.View{
|
||||
Measure: clientReceivedBytesPerRPC,
|
||||
Name: "grpc.io/client/received_bytes_per_rpc",
|
||||
Description: "Distribution of received bytes per RPC, by method.",
|
||||
TagKeys: []tag.Key{keyClientMethod},
|
||||
Aggregation: bytesDistribution,
|
||||
}
|
||||
// ClientStartedRPCsView is the count of opened RPCs, keyed on method.
|
||||
ClientStartedRPCsView = &view.View{
|
||||
Measure: clientStartedRPCs,
|
||||
Name: "grpc.io/client/started_rpcs",
|
||||
Description: "Number of opened client RPCs, by method.",
|
||||
TagKeys: []tag.Key{keyClientMethod},
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
// ClientCompletedRPCsView is the count of completed RPCs, keyed on method
|
||||
// and status.
|
||||
ClientCompletedRPCsView = &view.View{
|
||||
Measure: clientRoundtripLatency,
|
||||
Name: "grpc.io/client/completed_rpcs",
|
||||
Description: "Number of completed RPCs by method and status.",
|
||||
TagKeys: []tag.Key{keyClientMethod, keyClientStatus},
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
// ClientRoundtripLatencyView is the distribution of round-trip latency in
|
||||
// milliseconds per RPC, keyed on method.
|
||||
ClientRoundtripLatencyView = &view.View{
|
||||
Measure: clientRoundtripLatency,
|
||||
Name: "grpc.io/client/roundtrip_latency",
|
||||
Description: "Distribution of round-trip latency, by method.",
|
||||
TagKeys: []tag.Key{keyClientMethod},
|
||||
Aggregation: millisecondsDistribution,
|
||||
}
|
||||
)
|
||||
|
||||
// DefaultClientViews is the set of client views which are considered the
|
||||
// minimum required to monitor client side performance.
|
||||
var DefaultClientViews = []*view.View{
|
||||
ClientSentBytesPerRPCView,
|
||||
ClientReceivedBytesPerRPCView,
|
||||
ClientRoundtripLatencyView,
|
||||
ClientCompletedRPCsView,
|
||||
ClientStartedRPCsView,
|
||||
}
|
||||
|
|
@ -0,0 +1,940 @@
|
|||
/*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package opencensus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/tag"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/internal/grpctest"
|
||||
"google.golang.org/grpc/internal/leakcheck"
|
||||
"google.golang.org/grpc/internal/stubserver"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/test/grpc_testing"
|
||||
)
|
||||
|
||||
type s struct {
|
||||
grpctest.Tester
|
||||
}
|
||||
|
||||
func Test(t *testing.T) {
|
||||
grpctest.RunSubTests(t, s{})
|
||||
}
|
||||
|
||||
func init() {
|
||||
// OpenCensus, once included in binary, will spawn a global goroutine
|
||||
// recorder that is not controllable by application.
|
||||
// https://github.com/census-instrumentation/opencensus-go/issues/1191
|
||||
leakcheck.RegisterIgnoreGoroutine("go.opencensus.io/stats/view.(*worker).start")
|
||||
}
|
||||
|
||||
var defaultTestTimeout = 5 * time.Second
|
||||
|
||||
type fakeExporter struct {
|
||||
t *testing.T
|
||||
|
||||
mu sync.RWMutex
|
||||
seenViews map[string]*viewInformation
|
||||
}
|
||||
|
||||
// viewInformation is information Exported from the view package through
|
||||
// ExportView relevant to testing, i.e. a reasonably non flaky expectation of
|
||||
// desired emissions to Exporter.
|
||||
type viewInformation struct {
|
||||
aggType view.AggType
|
||||
aggBuckets []float64
|
||||
desc string
|
||||
tagKeys []tag.Key
|
||||
rows []*view.Row
|
||||
}
|
||||
|
||||
func (fe *fakeExporter) ExportView(vd *view.Data) {
|
||||
fe.mu.Lock()
|
||||
defer fe.mu.Unlock()
|
||||
fe.seenViews[vd.View.Name] = &viewInformation{
|
||||
aggType: vd.View.Aggregation.Type,
|
||||
aggBuckets: vd.View.Aggregation.Buckets,
|
||||
desc: vd.View.Description,
|
||||
tagKeys: vd.View.TagKeys,
|
||||
rows: vd.Rows,
|
||||
}
|
||||
}
|
||||
|
||||
// compareRows compares rows with respect to the information desired to test.
|
||||
// Both the tags representing the rows and also the data of the row are tested
|
||||
// for equality. Rows are in nondeterministic order when ExportView is called,
|
||||
// but handled inside this function by sorting.
|
||||
func compareRows(rows []*view.Row, rows2 []*view.Row) bool {
|
||||
if len(rows) != len(rows2) {
|
||||
return false
|
||||
}
|
||||
// Sort both rows according to the same rule. This is to take away non
|
||||
// determinism in the row ordering passed to the Exporter, while keeping the
|
||||
// row data.
|
||||
sort.Slice(rows, func(i, j int) bool {
|
||||
return rows[i].String() > rows[j].String()
|
||||
})
|
||||
|
||||
sort.Slice(rows2, func(i, j int) bool {
|
||||
return rows2[i].String() > rows2[j].String()
|
||||
})
|
||||
|
||||
for i, row := range rows {
|
||||
if !cmp.Equal(row.Tags, rows2[i].Tags, cmp.Comparer(func(a tag.Key, b tag.Key) bool {
|
||||
return a.Name() == b.Name()
|
||||
})) {
|
||||
return false
|
||||
}
|
||||
if !compareData(row.Data, rows2[i].Data) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// compareData returns whether the two aggregation data's are equal to each
|
||||
// other with respect to parts of the data desired for correct emission. The
|
||||
// function first makes sure the two types of aggregation data are the same, and
|
||||
// then checks the equality for the respective aggregation data type.
|
||||
func compareData(ad view.AggregationData, ad2 view.AggregationData) bool {
|
||||
if ad == nil && ad2 == nil {
|
||||
return true
|
||||
}
|
||||
if ad == nil || ad2 == nil {
|
||||
return false
|
||||
}
|
||||
if reflect.TypeOf(ad) != reflect.TypeOf(ad2) {
|
||||
return false
|
||||
}
|
||||
switch ad1 := ad.(type) {
|
||||
case *view.DistributionData:
|
||||
dd2 := ad2.(*view.DistributionData)
|
||||
// Count and Count Per Buckets are reasonable for correctness,
|
||||
// especially since we verify equality of bucket endpoints elsewhere.
|
||||
if ad1.Count != dd2.Count {
|
||||
return false
|
||||
}
|
||||
for i, count := range ad1.CountPerBucket {
|
||||
if count != dd2.CountPerBucket[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
case *view.CountData:
|
||||
cd2 := ad2.(*view.CountData)
|
||||
return ad1.Value == cd2.Value
|
||||
|
||||
// gRPC open census plugin does not have these next two types of aggregation
|
||||
// data types present, for now just check for type equality between the two
|
||||
// aggregation data points (done above).
|
||||
// case *view.SumData
|
||||
// case *view.LastValueData:
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (vi *viewInformation) Equal(vi2 *viewInformation) bool {
|
||||
if vi == nil && vi2 == nil {
|
||||
return true
|
||||
}
|
||||
if vi == nil || vi2 == nil {
|
||||
return false
|
||||
}
|
||||
if vi.aggType != vi2.aggType {
|
||||
return false
|
||||
}
|
||||
if !cmp.Equal(vi.aggBuckets, vi2.aggBuckets) {
|
||||
return false
|
||||
}
|
||||
if vi.desc != vi2.desc {
|
||||
return false
|
||||
}
|
||||
if !cmp.Equal(vi.tagKeys, vi2.tagKeys, cmp.Comparer(func(a tag.Key, b tag.Key) bool {
|
||||
return a.Name() == b.Name()
|
||||
})) {
|
||||
return false
|
||||
}
|
||||
if !compareRows(vi.rows, vi2.rows) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// distributionDataLatencyCount checks if the view information contains the
|
||||
// desired distrubtion latency total count that falls in buckets of 5 seconds or
|
||||
// less. This must be called with non nil view information that is aggregated
|
||||
// with distribution data. Returns a nil error if correct count information
|
||||
// found, non nil error if correct information not found.
|
||||
func distributionDataLatencyCount(vi *viewInformation, countWant int64) error {
|
||||
var totalCount int64
|
||||
var largestIndexWithFive int
|
||||
for i, bucket := range vi.aggBuckets {
|
||||
// Distribution for latency is measured in milliseconds, so 5 * 1000 =
|
||||
// 5000.
|
||||
if bucket > 5000 {
|
||||
largestIndexWithFive = i
|
||||
break
|
||||
}
|
||||
}
|
||||
// Iterating through rows sums up data points for all methods. In this case,
|
||||
// a data point for the unary and for the streaming RPC.
|
||||
for _, row := range vi.rows {
|
||||
// This could potentially have an extra measurement in buckets above 5s,
|
||||
// but that's fine. Count of buckets that could contain up to 5s is a
|
||||
// good enough assertion.
|
||||
for i, count := range row.Data.(*view.DistributionData).CountPerBucket {
|
||||
if i >= largestIndexWithFive {
|
||||
break
|
||||
}
|
||||
totalCount = totalCount + count
|
||||
}
|
||||
}
|
||||
if totalCount != countWant {
|
||||
return fmt.Errorf("wrong total count for counts under 5: %v, wantCount: %v", totalCount, countWant)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestAllMetricsOneFunction tests emitted metrics from gRPC. It registers all
|
||||
// the metrics provided by this package. It then configures a system with a gRPC
|
||||
// Client and gRPC server with the OpenCensus Dial and Server Option configured,
|
||||
// and makes a Unary RPC and a Streaming RPC. These two RPCs should cause
|
||||
// certain emissions for each registered metric through the OpenCensus View
|
||||
// package.
|
||||
func (s) TestAllMetricsOneFunction(t *testing.T) {
|
||||
allViews := []*view.View{
|
||||
ClientStartedRPCsView,
|
||||
ServerStartedRPCsView,
|
||||
ClientCompletedRPCsView,
|
||||
ServerCompletedRPCsView,
|
||||
ClientSentBytesPerRPCView,
|
||||
ServerSentBytesPerRPCView,
|
||||
ClientReceivedBytesPerRPCView,
|
||||
ServerReceivedBytesPerRPCView,
|
||||
ClientSentMessagesPerRPCView,
|
||||
ServerSentMessagesPerRPCView,
|
||||
ClientReceivedMessagesPerRPCView,
|
||||
ServerReceivedMessagesPerRPCView,
|
||||
ClientRoundtripLatencyView,
|
||||
ServerLatencyView,
|
||||
}
|
||||
view.Register(allViews...)
|
||||
// Unregister unconditionally in this defer to correctly cleanup globals in
|
||||
// error conditions.
|
||||
defer view.Unregister(allViews...)
|
||||
fe := &fakeExporter{
|
||||
t: t,
|
||||
seenViews: make(map[string]*viewInformation),
|
||||
}
|
||||
view.RegisterExporter(fe)
|
||||
defer view.UnregisterExporter(fe)
|
||||
|
||||
ss := &stubserver.StubServer{
|
||||
UnaryCallF: func(ctx context.Context, in *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
|
||||
return &grpc_testing.SimpleResponse{}, nil
|
||||
},
|
||||
FullDuplexCallF: func(stream grpc_testing.TestService_FullDuplexCallServer) error {
|
||||
for {
|
||||
_, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
if err := ss.Start([]grpc.ServerOption{ServerOption(TraceOptions{})}, DialOption(TraceOptions{})); err != nil {
|
||||
t.Fatalf("Error starting endpoint server: %v", err)
|
||||
}
|
||||
defer ss.Stop()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
// Make two RPC's, a unary RPC and a streaming RPC. These should cause
|
||||
// certain metrics to be emitted.
|
||||
if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{}}); err != nil {
|
||||
t.Fatalf("Unexpected error from UnaryCall: %v", err)
|
||||
}
|
||||
stream, err := ss.Client.FullDuplexCall(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("ss.Client.FullDuplexCall failed: %f", err)
|
||||
}
|
||||
|
||||
stream.CloseSend()
|
||||
if _, err = stream.Recv(); err != io.EOF {
|
||||
t.Fatalf("unexpected error: %v, expected an EOF error", err)
|
||||
}
|
||||
|
||||
cmtk := tag.MustNewKey("grpc_client_method")
|
||||
smtk := tag.MustNewKey("grpc_server_method")
|
||||
cstk := tag.MustNewKey("grpc_client_status")
|
||||
sstk := tag.MustNewKey("grpc_server_status")
|
||||
wantMetrics := []struct {
|
||||
metric *view.View
|
||||
wantVI *viewInformation
|
||||
}{
|
||||
{
|
||||
metric: ClientStartedRPCsView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeCount,
|
||||
aggBuckets: []float64{},
|
||||
desc: "Number of opened client RPCs, by method.",
|
||||
tagKeys: []tag.Key{
|
||||
cmtk,
|
||||
},
|
||||
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
},
|
||||
Data: &view.CountData{
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
},
|
||||
Data: &view.CountData{
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
metric: ServerStartedRPCsView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeCount,
|
||||
aggBuckets: []float64{},
|
||||
desc: "Number of opened server RPCs, by method.",
|
||||
tagKeys: []tag.Key{
|
||||
smtk,
|
||||
},
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
},
|
||||
Data: &view.CountData{
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
},
|
||||
Data: &view.CountData{
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
metric: ClientCompletedRPCsView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeCount,
|
||||
aggBuckets: []float64{},
|
||||
desc: "Number of completed RPCs by method and status.",
|
||||
tagKeys: []tag.Key{
|
||||
cmtk,
|
||||
cstk,
|
||||
},
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
{
|
||||
Key: cstk,
|
||||
Value: "OK",
|
||||
},
|
||||
},
|
||||
Data: &view.CountData{
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
{
|
||||
Key: cstk,
|
||||
Value: "OK",
|
||||
},
|
||||
},
|
||||
Data: &view.CountData{
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
metric: ServerCompletedRPCsView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeCount,
|
||||
aggBuckets: []float64{},
|
||||
desc: "Number of completed RPCs by method and status.",
|
||||
tagKeys: []tag.Key{
|
||||
smtk,
|
||||
sstk,
|
||||
},
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
{
|
||||
Key: sstk,
|
||||
Value: "OK",
|
||||
},
|
||||
},
|
||||
Data: &view.CountData{
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
{
|
||||
Key: sstk,
|
||||
Value: "OK",
|
||||
},
|
||||
},
|
||||
Data: &view.CountData{
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
metric: ClientSentBytesPerRPCView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeDistribution,
|
||||
aggBuckets: bytesDistributionBounds,
|
||||
desc: "Distribution of sent bytes per RPC, by method.",
|
||||
tagKeys: []tag.Key{
|
||||
cmtk,
|
||||
},
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
metric: ServerSentBytesPerRPCView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeDistribution,
|
||||
aggBuckets: bytesDistributionBounds,
|
||||
desc: "Distribution of sent bytes per RPC, by method.",
|
||||
tagKeys: []tag.Key{
|
||||
smtk,
|
||||
},
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
metric: ClientReceivedBytesPerRPCView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeDistribution,
|
||||
aggBuckets: bytesDistributionBounds,
|
||||
desc: "Distribution of received bytes per RPC, by method.",
|
||||
tagKeys: []tag.Key{
|
||||
cmtk,
|
||||
},
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
metric: ServerReceivedBytesPerRPCView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeDistribution,
|
||||
aggBuckets: bytesDistributionBounds,
|
||||
desc: "Distribution of received bytes per RPC, by method.",
|
||||
tagKeys: []tag.Key{
|
||||
smtk,
|
||||
},
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
metric: ClientSentMessagesPerRPCView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeDistribution,
|
||||
aggBuckets: countDistributionBounds,
|
||||
desc: "Distribution of sent messages per RPC, by method.",
|
||||
tagKeys: []tag.Key{
|
||||
cmtk,
|
||||
},
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
metric: ServerSentMessagesPerRPCView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeDistribution,
|
||||
aggBuckets: countDistributionBounds,
|
||||
desc: "Distribution of sent messages per RPC, by method.",
|
||||
tagKeys: []tag.Key{
|
||||
smtk,
|
||||
},
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
metric: ClientReceivedMessagesPerRPCView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeDistribution,
|
||||
aggBuckets: countDistributionBounds,
|
||||
desc: "Distribution of received messages per RPC, by method.",
|
||||
tagKeys: []tag.Key{
|
||||
cmtk,
|
||||
},
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: cmtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
metric: ServerReceivedMessagesPerRPCView,
|
||||
wantVI: &viewInformation{
|
||||
aggType: view.AggTypeDistribution,
|
||||
aggBuckets: countDistributionBounds,
|
||||
desc: "Distribution of received messages per RPC, by method.",
|
||||
tagKeys: []tag.Key{
|
||||
smtk,
|
||||
},
|
||||
rows: []*view.Row{
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/UnaryCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
Tags: []tag.Tag{
|
||||
{
|
||||
Key: smtk,
|
||||
Value: "grpc.testing.TestService/FullDuplexCall",
|
||||
},
|
||||
},
|
||||
Data: &view.DistributionData{
|
||||
Count: 1,
|
||||
CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
metric: ClientRoundtripLatencyView,
|
||||
},
|
||||
{
|
||||
metric: ServerLatencyView,
|
||||
},
|
||||
}
|
||||
// Unregister all the views. Unregistering a view causes a synchronous
|
||||
// upload of any collected data for the view to any registered exporters.
|
||||
// Thus, after this unregister call, the exporter has the data to make
|
||||
// assertions on immediately.
|
||||
view.Unregister(allViews...)
|
||||
// Assert the expected emissions for each metric match the expected
|
||||
// emissions.
|
||||
for _, wantMetric := range wantMetrics {
|
||||
metricName := wantMetric.metric.Name
|
||||
var vi *viewInformation
|
||||
if vi = fe.seenViews[metricName]; vi == nil {
|
||||
t.Fatalf("couldn't find %v in the views exported, never collected", metricName)
|
||||
}
|
||||
|
||||
// For latency metrics, there is a lot of non determinism about
|
||||
// the exact milliseconds of RPCs that finish. Thus, rather than
|
||||
// declare the exact data you want, make sure the latency
|
||||
// measurement points for the two RPCs above fall within buckets
|
||||
// that fall into less than 5 seconds, which is the rpc timeout.
|
||||
if metricName == "grpc.io/client/roundtrip_latency" || metricName == "grpc.io/server/server_latency" {
|
||||
// RPCs have a context timeout of 5s, so all the recorded
|
||||
// measurements (one per RPC - two total) should fall within 5
|
||||
// second buckets.
|
||||
if err := distributionDataLatencyCount(vi, 2); err != nil {
|
||||
t.Fatalf("Invalid OpenCensus export view data for metric %v: %v", metricName, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if diff := cmp.Diff(vi, wantMetric.wantVI); diff != "" {
|
||||
t.Fatalf("got unexpected viewInformation for metric %v, diff (-got, +want): %v", metricName, diff)
|
||||
}
|
||||
// Note that this test only fatals with one error if a metric fails.
|
||||
// This is fine, as all are expected to pass so if a single one fails
|
||||
// you can figure it out and iterate as needed.
|
||||
}
|
||||
}
|
||||
|
||||
// TestOpenCensusTags tests this instrumentation code's ability to propagate
|
||||
// OpenCensus tags across the wire. It also tests the server stats handler's
|
||||
// functionality of adding the server method tag for the application to see. The
|
||||
// test makes an Unary RPC without a tag map and with a tag map, and expects to
|
||||
// see a tag map at the application layer with server method tag in the first
|
||||
// case, and a tag map at the application layer with the populated tag map plus
|
||||
// server method tag in second case.
|
||||
func (s) TestOpenCensusTags(t *testing.T) {
|
||||
// This stub servers functions represent the application layer server side.
|
||||
// This is the intended feature being tested: that open census tags
|
||||
// populated at the client side application layer end up at the server side
|
||||
// application layer with the server method tag key in addition to the map
|
||||
// populated at the client side application layer if populated.
|
||||
tmCh := testutils.NewChannel()
|
||||
ss := &stubserver.StubServer{
|
||||
UnaryCallF: func(ctx context.Context, in *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
|
||||
// Do the sends of the tag maps for assertions in this main testing
|
||||
// goroutine. Do the receives and assertions in a forked goroutine.
|
||||
if tm := tag.FromContext(ctx); tm != nil {
|
||||
tmCh.Send(tm)
|
||||
} else {
|
||||
tmCh.Send(errors.New("no tag map received server side"))
|
||||
}
|
||||
return &grpc_testing.SimpleResponse{}, nil
|
||||
},
|
||||
}
|
||||
if err := ss.Start([]grpc.ServerOption{ServerOption(TraceOptions{})}, DialOption(TraceOptions{})); err != nil {
|
||||
t.Fatalf("Error starting endpoint server: %v", err)
|
||||
}
|
||||
defer ss.Stop()
|
||||
|
||||
key1 := tag.MustNewKey("key 1")
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
readerErrCh := testutils.NewChannel()
|
||||
// Spawn a goroutine to receive and validation two tag maps received by the
|
||||
// server application code.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
unaryCallMethodName := "grpc.testing.TestService/UnaryCall"
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
// Attempt to receive the tag map from the first RPC.
|
||||
if tm, err := tmCh.Receive(ctx); err == nil {
|
||||
tagMap, ok := tm.(*tag.Map)
|
||||
// Shouldn't happen, this test sends only *tag.Map type on channel.
|
||||
if !ok {
|
||||
readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", tm))
|
||||
}
|
||||
// keyServerMethod should be present in this tag map received server
|
||||
// side.
|
||||
val, ok := tagMap.Value(keyServerMethod)
|
||||
if !ok {
|
||||
readerErrCh.Send(fmt.Errorf("no key: %v present in OpenCensus tag map", keyServerMethod.Name()))
|
||||
}
|
||||
if val != unaryCallMethodName {
|
||||
readerErrCh.Send(fmt.Errorf("serverMethod receieved: %v, want server method: %v", val, unaryCallMethodName))
|
||||
}
|
||||
} else {
|
||||
readerErrCh.Send(fmt.Errorf("error while waiting for a tag map: %v", err))
|
||||
}
|
||||
readerErrCh.Send(nil)
|
||||
|
||||
// Attempt to receive the tag map from the second RPC.
|
||||
if tm, err := tmCh.Receive(ctx); err == nil {
|
||||
tagMap, ok := tm.(*tag.Map)
|
||||
// Shouldn't happen, this test sends only *tag.Map type on channel.
|
||||
if !ok {
|
||||
readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", tm))
|
||||
}
|
||||
// key1: "value1" populated in the tag map client side should make
|
||||
// it's way to server.
|
||||
val, ok := tagMap.Value(key1)
|
||||
if !ok {
|
||||
readerErrCh.Send(fmt.Errorf("no key: %v present in OpenCensus tag map", key1.Name()))
|
||||
}
|
||||
if val != "value1" {
|
||||
readerErrCh.Send(fmt.Errorf("key %v received: %v, want server method: %v", key1.Name(), val, unaryCallMethodName))
|
||||
}
|
||||
// keyServerMethod should be appended to tag map as well.
|
||||
val, ok = tagMap.Value(keyServerMethod)
|
||||
if !ok {
|
||||
readerErrCh.Send(fmt.Errorf("no key: %v present in OpenCensus tag map", keyServerMethod.Name()))
|
||||
}
|
||||
if val != unaryCallMethodName {
|
||||
readerErrCh.Send(fmt.Errorf("key: %v received: %v, want server method: %v", keyServerMethod.Name(), val, unaryCallMethodName))
|
||||
}
|
||||
} else {
|
||||
readerErrCh.Send(fmt.Errorf("error while waiting for second tag map: %v", err))
|
||||
}
|
||||
readerErrCh.Send(nil)
|
||||
}()
|
||||
|
||||
// Make a unary RPC without populating an OpenCensus tag map. The server
|
||||
// side should receive an OpenCensus tag map containing only the
|
||||
// keyServerMethod.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{}}); err != nil {
|
||||
t.Fatalf("Unexpected error from UnaryCall: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
// Should receive a nil error from the readerErrCh, meaning the reader
|
||||
// goroutine successfully received a tag map with the keyServerMethod
|
||||
// populated.
|
||||
if chErr, err := readerErrCh.Receive(ctx); chErr != nil || err != nil {
|
||||
if err != nil {
|
||||
t.Fatalf("Should have received something from error channel: %v", err)
|
||||
}
|
||||
if chErr != nil {
|
||||
t.Fatalf("Should have received a nil error from channel, instead received: %v", chErr)
|
||||
}
|
||||
}
|
||||
|
||||
tm := &tag.Map{}
|
||||
ctx = tag.NewContext(ctx, tm)
|
||||
ctx, err := tag.New(ctx, tag.Upsert(key1, "value1"))
|
||||
// Setup steps like this can fatal, so easier to do the RPC's and subsequent
|
||||
// sends of the tag maps of the RPC's in main goroutine and have the
|
||||
// corresponding receives and assertions in a forked goroutine.
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating tag map: %v", err)
|
||||
}
|
||||
// Make a unary RPC with a populated OpenCensus tag map. The server side
|
||||
// should receive an OpenCensus tag map containing this populated tag map
|
||||
// with the keyServerMethod tag appended to it.
|
||||
if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{}}); err != nil {
|
||||
t.Fatalf("Unexpected error from UnaryCall: %v", err)
|
||||
}
|
||||
if chErr, err := readerErrCh.Receive(ctx); chErr != nil || err != nil {
|
||||
if err != nil {
|
||||
t.Fatalf("Should have received something from error channel: %v", err)
|
||||
}
|
||||
if chErr != nil {
|
||||
t.Fatalf("Should have received a nil error from channel, instead received: %v", chErr)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
@ -3,6 +3,7 @@ module google.golang.org/grpc/stats/opencensus
|
|||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/google/go-cmp v0.5.9
|
||||
go.opencensus.io v0.24.0
|
||||
google.golang.org/grpc v1.52.0
|
||||
)
|
||||
|
|
|
|||
|
|
@ -103,10 +103,13 @@ func (csh *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
|
|||
|
||||
// TagRPC implements per RPC attempt context management.
|
||||
func (csh *clientStatsHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context {
|
||||
ctx = csh.statsTagRPC(ctx, rti)
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (csh *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {}
|
||||
func (csh *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
|
||||
recordRPCData(ctx, rs)
|
||||
}
|
||||
|
||||
type serverStatsHandler struct {
|
||||
to TraceOptions
|
||||
|
|
@ -122,8 +125,11 @@ func (ssh *serverStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
|
|||
|
||||
// TagRPC implements per RPC context management.
|
||||
func (ssh *serverStatsHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context {
|
||||
ctx = ssh.statsTagRPC(ctx, rti)
|
||||
return ctx
|
||||
}
|
||||
|
||||
// HandleRPC implements per RPC tracing and stats implementation.
|
||||
func (ssh *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {}
|
||||
func (ssh *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
|
||||
recordRPCData(ctx, rs)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package opencensus
|
||||
|
||||
import (
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/tag"
|
||||
)
|
||||
|
||||
var (
|
||||
keyServerMethod = tag.MustNewKey("grpc_server_method")
|
||||
keyServerStatus = tag.MustNewKey("grpc_server_status")
|
||||
)
|
||||
|
||||
// Measures, which are recorded by server stats handler: Note that on gRPC's
|
||||
// server side, the per rpc unit is truly per rpc, as there is no concept of a
|
||||
// rpc attempt server side.
|
||||
var (
|
||||
serverReceivedMessagesPerRPC = stats.Int64("grpc.io/server/received_messages_per_rpc", "Number of messages received in each RPC. Has value 1 for non-streaming RPCs.", stats.UnitDimensionless) // the collection/measurement point of this measure handles the /rpc aspect of it
|
||||
serverReceivedBytesPerRPC = stats.Int64("grpc.io/server/received_bytes_per_rpc", "Total bytes received across all messages per RPC.", stats.UnitBytes)
|
||||
serverSentMessagesPerRPC = stats.Int64("grpc.io/server/sent_messages_per_rpc", "Number of messages sent in each RPC. Has value 1 for non-streaming RPCs.", stats.UnitDimensionless)
|
||||
serverSentBytesPerRPC = stats.Int64("grpc.io/server/sent_bytes_per_rpc", "Total bytes sent in across all response messages per RPC.", stats.UnitBytes)
|
||||
serverStartedRPCs = stats.Int64("grpc.io/server/started_rpcs", "The total number of server RPCs ever opened, including those that have not completed.", stats.UnitDimensionless)
|
||||
serverLatency = stats.Float64("grpc.io/server/server_latency", "Time between first byte of request received to last byte of response sent, or terminal error.", stats.UnitMilliseconds)
|
||||
)
|
||||
|
||||
var (
|
||||
// ServerSentMessagesPerRPCView is the distribution of sent messages per
|
||||
// RPC, keyed on method.
|
||||
ServerSentMessagesPerRPCView = &view.View{
|
||||
Name: "grpc.io/server/sent_messages_per_rpc",
|
||||
Description: "Distribution of sent messages per RPC, by method.",
|
||||
TagKeys: []tag.Key{keyServerMethod},
|
||||
Measure: serverSentMessagesPerRPC,
|
||||
Aggregation: countDistribution,
|
||||
}
|
||||
// ServerReceivedMessagesPerRPCView is the distribution of received messages
|
||||
// per RPC, keyed on method.
|
||||
ServerReceivedMessagesPerRPCView = &view.View{
|
||||
Name: "grpc.io/server/received_messages_per_rpc",
|
||||
Description: "Distribution of received messages per RPC, by method.",
|
||||
TagKeys: []tag.Key{keyServerMethod},
|
||||
Measure: serverReceivedMessagesPerRPC,
|
||||
Aggregation: countDistribution,
|
||||
}
|
||||
// ServerSentBytesPerRPCView is the distribution of received bytes per RPC,
|
||||
// keyed on method.
|
||||
ServerSentBytesPerRPCView = &view.View{
|
||||
Name: "grpc.io/server/sent_bytes_per_rpc",
|
||||
Description: "Distribution of sent bytes per RPC, by method.",
|
||||
Measure: serverSentBytesPerRPC,
|
||||
TagKeys: []tag.Key{keyServerMethod},
|
||||
Aggregation: bytesDistribution,
|
||||
}
|
||||
// ServerReceivedBytesPerRPCView is the distribution of sent bytes per RPC,
|
||||
// keyed on method.
|
||||
ServerReceivedBytesPerRPCView = &view.View{
|
||||
Name: "grpc.io/server/received_bytes_per_rpc",
|
||||
Description: "Distribution of received bytes per RPC, by method.",
|
||||
Measure: serverReceivedBytesPerRPC,
|
||||
TagKeys: []tag.Key{keyServerMethod},
|
||||
Aggregation: bytesDistribution,
|
||||
}
|
||||
// ServerStartedRPCsView is the count of opened RPCs, keyed on method.
|
||||
ServerStartedRPCsView = &view.View{
|
||||
Measure: serverStartedRPCs,
|
||||
Name: "grpc.io/server/started_rpcs",
|
||||
Description: "Number of opened server RPCs, by method.",
|
||||
TagKeys: []tag.Key{keyServerMethod},
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
// ServerCompletedRPCsView is the count of completed RPCs, keyed on
|
||||
// method and status.
|
||||
ServerCompletedRPCsView = &view.View{
|
||||
Name: "grpc.io/server/completed_rpcs",
|
||||
Description: "Number of completed RPCs by method and status.",
|
||||
TagKeys: []tag.Key{keyServerMethod, keyServerStatus},
|
||||
Measure: serverLatency,
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
// ServerLatencyView is the distribution of server latency in milliseconds
|
||||
// per RPC, keyed on method.
|
||||
ServerLatencyView = &view.View{
|
||||
Name: "grpc.io/server/server_latency",
|
||||
Description: "Distribution of server latency in milliseconds, by method.",
|
||||
TagKeys: []tag.Key{keyServerMethod},
|
||||
Measure: serverLatency,
|
||||
Aggregation: millisecondsDistribution,
|
||||
}
|
||||
)
|
||||
|
||||
// DefaultServerViews is the set of server views which are considered the
|
||||
// minimum required to monitor server side performance.
|
||||
var DefaultServerViews = []*view.View{
|
||||
ServerReceivedBytesPerRPCView,
|
||||
ServerSentBytesPerRPCView,
|
||||
ServerLatencyView,
|
||||
ServerCompletedRPCsView,
|
||||
ServerStartedRPCsView,
|
||||
}
|
||||
|
|
@ -0,0 +1,215 @@
|
|||
/*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package opencensus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
ocstats "go.opencensus.io/stats"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/tag"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
var logger = grpclog.Component("opencensus-instrumentation")
|
||||
|
||||
var canonicalString = internal.CanonicalString.(func(codes.Code) string)
|
||||
|
||||
type rpcDataKey struct{}
|
||||
|
||||
func setRPCData(ctx context.Context, d *rpcData) context.Context {
|
||||
return context.WithValue(ctx, rpcDataKey{}, d)
|
||||
}
|
||||
|
||||
var (
|
||||
// bounds separate variable for testing purposes.
|
||||
bytesDistributionBounds = []float64{1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296}
|
||||
bytesDistribution = view.Distribution(bytesDistributionBounds...)
|
||||
millisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
|
||||
countDistributionBounds = []float64{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536}
|
||||
countDistribution = view.Distribution(countDistributionBounds...)
|
||||
)
|
||||
|
||||
func removeLeadingSlash(mn string) string {
|
||||
return strings.TrimLeft(mn, "/")
|
||||
}
|
||||
|
||||
// rpcData is data about the rpc attempt client side, and the overall rpc server
|
||||
// side.
|
||||
type rpcData struct {
|
||||
// access these counts atomically for hedging in the future
|
||||
// number of messages sent from side (client || server)
|
||||
sentMsgs int64
|
||||
// number of bytes sent (within each message) from side (client || server)
|
||||
sentBytes int64
|
||||
// number of messages received on side (client || server)
|
||||
recvMsgs int64
|
||||
// number of bytes received (within each message) received on side (client
|
||||
// || server)
|
||||
recvBytes int64
|
||||
|
||||
startTime time.Time
|
||||
method string
|
||||
}
|
||||
|
||||
// statsTagRPC creates a recording object to derive measurements from in the
|
||||
// context, scoping the recordings to per RPC Attempt client side (scope of the
|
||||
// context). It also populates the gRPC Metadata within the context with any
|
||||
// opencensus specific tags set by the application in the context, binary
|
||||
// encoded to send across the wire.
|
||||
func (csh *clientStatsHandler) statsTagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
|
||||
d := &rpcData{
|
||||
startTime: time.Now(),
|
||||
method: info.FullMethodName,
|
||||
}
|
||||
// Populate gRPC Metadata with OpenCensus tag map if set by application.
|
||||
if tm := tag.FromContext(ctx); tm != nil {
|
||||
ctx = stats.SetTags(ctx, tag.Encode(tm))
|
||||
}
|
||||
return setRPCData(ctx, d)
|
||||
}
|
||||
|
||||
// statsTagRPC creates a recording object to derive measurements from in the
|
||||
// context, scoping the recordings to per RPC server side (scope of the
|
||||
// context). It also deserializes the opencensus tags set in the context's gRPC
|
||||
// Metadata, and adds a server method tag to the opencensus tags.
|
||||
func (ssh *serverStatsHandler) statsTagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
|
||||
d := &rpcData{
|
||||
startTime: time.Now(),
|
||||
method: info.FullMethodName,
|
||||
}
|
||||
if tagsBin := stats.Tags(ctx); tagsBin != nil {
|
||||
if tags, err := tag.Decode(tagsBin); err == nil {
|
||||
ctx = tag.NewContext(ctx, tags)
|
||||
}
|
||||
}
|
||||
// We can ignore the error here because in the error case, the context
|
||||
// passed in is returned. If the call errors, the server side application
|
||||
// layer won't get this key server method information in the tag map, but
|
||||
// this instrumentation code will function as normal.
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(keyServerMethod, removeLeadingSlash(info.FullMethodName)))
|
||||
return setRPCData(ctx, d)
|
||||
}
|
||||
|
||||
func recordRPCData(ctx context.Context, s stats.RPCStats) {
|
||||
d, ok := ctx.Value(rpcDataKey{}).(*rpcData)
|
||||
if !ok {
|
||||
// Shouldn't happen, as gRPC calls TagRPC which populates the rpcData in
|
||||
// context.
|
||||
return
|
||||
}
|
||||
switch st := s.(type) {
|
||||
case *stats.InHeader, *stats.OutHeader, *stats.InTrailer, *stats.OutTrailer:
|
||||
// Headers and Trailers are not relevant to the measures, as the
|
||||
// measures concern number of messages and bytes for messages. This
|
||||
// aligns with flow control.
|
||||
case *stats.Begin:
|
||||
recordDataBegin(ctx, d, st)
|
||||
case *stats.OutPayload:
|
||||
recordDataOutPayload(d, st)
|
||||
case *stats.InPayload:
|
||||
recordDataInPayload(d, st)
|
||||
case *stats.End:
|
||||
recordDataEnd(ctx, d, st)
|
||||
default:
|
||||
// Shouldn't happen. gRPC calls into stats handler, and will never not
|
||||
// be one of the types above.
|
||||
logger.Errorf("Received unexpected stats type (%T) with data: %v", s, s)
|
||||
}
|
||||
}
|
||||
|
||||
// recordDataBegin takes a measurement related to the RPC beginning,
|
||||
// client/server started RPCs dependent on the caller.
|
||||
func recordDataBegin(ctx context.Context, d *rpcData, b *stats.Begin) {
|
||||
if b.Client {
|
||||
ocstats.RecordWithOptions(ctx,
|
||||
ocstats.WithTags(tag.Upsert(keyClientMethod, removeLeadingSlash(d.method))),
|
||||
ocstats.WithMeasurements(clientStartedRPCs.M(1)))
|
||||
return
|
||||
}
|
||||
ocstats.RecordWithOptions(ctx,
|
||||
ocstats.WithTags(tag.Upsert(keyServerMethod, removeLeadingSlash(d.method))),
|
||||
ocstats.WithMeasurements(serverStartedRPCs.M(1)))
|
||||
}
|
||||
|
||||
// recordDataOutPayload records the length in bytes of outgoing messages and
|
||||
// increases total count of sent messages both stored in the RPCs (attempt on
|
||||
// client side) context for use in taking measurements at RPC end.
|
||||
func recordDataOutPayload(d *rpcData, op *stats.OutPayload) {
|
||||
atomic.AddInt64(&d.sentMsgs, 1)
|
||||
atomic.AddInt64(&d.sentBytes, int64(op.Length))
|
||||
}
|
||||
|
||||
// recordDataInPayload records the length in bytes of incoming messages and
|
||||
// increases total count of sent messages both stored in the RPCs (attempt on
|
||||
// client side) context for use in taking measurements at RPC end.
|
||||
func recordDataInPayload(d *rpcData, ip *stats.InPayload) {
|
||||
atomic.AddInt64(&d.recvMsgs, 1)
|
||||
atomic.AddInt64(&d.recvBytes, int64(ip.Length))
|
||||
}
|
||||
|
||||
// recordDataEnd takes per RPC measurements derived from information derived
|
||||
// from the lifetime of the RPC (RPC attempt client side).
|
||||
func recordDataEnd(ctx context.Context, d *rpcData, e *stats.End) {
|
||||
// latency bounds for distribution data (speced millisecond bounds) have
|
||||
// fractions, thus need a float.
|
||||
latency := float64(time.Since(d.startTime)) / float64(time.Millisecond)
|
||||
var st string
|
||||
if e.Error != nil {
|
||||
s, _ := status.FromError(e.Error)
|
||||
st = canonicalString(s.Code())
|
||||
} else {
|
||||
st = "OK"
|
||||
}
|
||||
|
||||
// TODO: Attach trace data through attachments?!?!
|
||||
|
||||
if e.Client {
|
||||
ocstats.RecordWithOptions(ctx,
|
||||
ocstats.WithTags(
|
||||
tag.Upsert(keyClientMethod, removeLeadingSlash(d.method)),
|
||||
tag.Upsert(keyClientStatus, st)),
|
||||
ocstats.WithMeasurements(
|
||||
clientSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
|
||||
clientSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentMsgs)),
|
||||
clientReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvMsgs)),
|
||||
clientReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
|
||||
clientRoundtripLatency.M(latency),
|
||||
clientServerLatency.M(latency),
|
||||
))
|
||||
return
|
||||
}
|
||||
ocstats.RecordWithOptions(ctx,
|
||||
ocstats.WithTags(
|
||||
tag.Upsert(keyServerMethod, removeLeadingSlash(d.method)),
|
||||
tag.Upsert(keyServerStatus, st),
|
||||
),
|
||||
ocstats.WithMeasurements(
|
||||
serverSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
|
||||
serverSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentMsgs)),
|
||||
serverReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvMsgs)),
|
||||
serverReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
|
||||
serverLatency.M(latency)))
|
||||
}
|
||||
Loading…
Reference in New Issue