diff --git a/stats/stats.go b/stats/stats.go index 6969a1c35..450ea0b03 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -32,7 +32,7 @@ */ // Package stats is for collecting and reporting various network and RPC stats. -// This package is for monitoring purpose only. +// This package is for monitoring purpose only. All fields are read-only. // All APIs are experimental. package stats // import "google.golang.org/grpc/stats" @@ -72,7 +72,7 @@ type InPayload struct { Client bool // Payload is the payload with original type. Payload interface{} - // Data is the unencrypted message payload. + // Data is the serialized message payload. Data []byte // Length is the length of uncompressed data. Length int @@ -99,7 +99,7 @@ type InHeader struct { RemoteAddr net.Addr // LocalAddr is the local address of the corresponding connection. LocalAddr net.Addr - // Encryption is encrypt method used in the RPC. + // Encryption is the encryption algorithm used for the RPC. Encryption string } @@ -123,7 +123,7 @@ type OutPayload struct { Client bool // Payload is the payload with original type. Payload interface{} - // Data is the unencrypted message payload. + // Data is the serialized message payload. Data []byte // Length is the length of uncompressed data. Length int @@ -150,7 +150,7 @@ type OutHeader struct { RemoteAddr net.Addr // LocalAddr is the local address of the corresponding connection. LocalAddr net.Addr - // Encryption is encrypt method used in the RPC. + // Encryption is the encryption algorithm used for the RPC. Encryption string } diff --git a/stream.go b/stream.go index 2d66399e6..362204524 100644 --- a/stream.go +++ b/stream.go @@ -214,7 +214,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth tracing: EnableTracing, trInfo: trInfo, - userCtx: ctx, + statsCtx: ctx, } if cc.dopts.cp != nil { cs.cbuf = new(bytes.Buffer) @@ -268,9 +268,10 @@ type clientStream struct { // and is set to nil when the clientStream's finish method is called. trInfo traceInfo - // Keep the user context for stats handling. - // All stats handling should use the user context instead of the stream context. - userCtx context.Context + // statsCtx keeps the user context for stats handling. + // All stats collection should use the statsCtx (instead of the stream context) + // so that all the generated stats for a particular RPC can be associated in the processing phase. + statsCtx context.Context } func (cs *clientStream) Context() context.Context { @@ -286,7 +287,7 @@ func (cs *clientStream) Header() (_ metadata.MD, err error) { EndTime: time.Now(), Error: err, } - stats.Handle(cs.userCtx, end) + stats.Handle(cs.statsCtx, end) } }() m, err := cs.s.Header() @@ -318,7 +319,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { Client: true, Error: err, } - stats.Handle(cs.userCtx, end) + stats.Handle(cs.statsCtx, end) } }() defer func() { @@ -361,7 +362,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { err = cs.t.Write(cs.s, out, &transport.Options{Last: false}) if err == nil && outPayload != nil { outPayload.SentTime = time.Now() - stats.Handle(cs.userCtx, outPayload) + stats.Handle(cs.statsCtx, outPayload) } return err } @@ -378,7 +379,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { EndTime: time.Now(), Error: e, } - stats.Handle(cs.userCtx, end) + stats.Handle(cs.statsCtx, end) } }() var inPayload *stats.InPayload @@ -403,7 +404,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { cs.mu.Unlock() } if inPayload != nil { - stats.Handle(cs.userCtx, inPayload) + stats.Handle(cs.statsCtx, inPayload) } if !cs.desc.ClientStreams || cs.desc.ServerStreams { return diff --git a/transport/http2_client.go b/transport/http2_client.go index b0dbe07cb..f8bf747e1 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -349,7 +349,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return nil, ErrConnClosing } s := t.newStream(ctx, callHdr) - s.userCtx = userCtx + s.clientStatsCtx = userCtx t.activeStreams[s.id] = s // This stream is not counted when applySetings(...) initialize t.streamsQuota. @@ -460,7 +460,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea LocalAddr: t.conn.LocalAddr(), Encryption: callHdr.SendCompress, } - stats.Handle(s.userCtx, outHeader) + stats.Handle(s.clientStatsCtx, outHeader) } t.writableChan <- 0 return s, nil @@ -897,13 +897,13 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { Client: true, WireLength: int(frame.Header().Length), } - stats.Handle(s.userCtx, inHeader) + stats.Handle(s.clientStatsCtx, inHeader) } else { inTrailer := &stats.InTrailer{ Client: true, WireLength: int(frame.Header().Length), } - stats.Handle(s.userCtx, inTrailer) + stats.Handle(s.clientStatsCtx, inTrailer) } } }() diff --git a/transport/transport.go b/transport/transport.go index 04582fd6a..82a2d7f2e 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -168,9 +168,11 @@ type Stream struct { id uint32 // nil for client side Stream. st ServerTransport - // Keep the user context for stats handling. - // All stats handling should use the user context instead of the stream context. - userCtx context.Context + // clientStatsCtx keeps the user context for stats handling. + // It's only valid on client side. Server side stats context is same as s.ctx. + // All client side stats collection should use the clientStatsCtx (instead of the stream context) + // so that all the generated stats for a particular RPC can be associated in the processing phase. + clientStatsCtx context.Context // ctx is the associated context of the stream. ctx context.Context // cancel is always nil for client side Stream.