diff --git a/call.go b/call.go index 8aabbec67..858bf542e 100644 --- a/call.go +++ b/call.go @@ -64,23 +64,23 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s return } p := &parser{r: stream} - var incomingPayloadStats *stats.IncomingPayloadStats + var inStats *stats.InPayload if stats.On() { - incomingPayloadStats = &stats.IncomingPayloadStats{ - IsClient: true, + inStats = &stats.InPayload{ + Client: true, } } for { - if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, incomingPayloadStats); err != nil { + if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inStats); err != nil { if err == io.EOF { break } return } } - if err == io.EOF && stream.StatusCode() == codes.OK && incomingPayloadStats != nil { - // TODO in the current implementation, incomingTrailerStats is handled before incomingPayloadStats. Fix the order if necessary. - stats.Handle(stream.Context(), incomingPayloadStats) + if err == io.EOF && stream.StatusCode() == codes.OK && inStats != nil { + // TODO in the current implementation, inTrailer is handled before inStats. Fix the order if necessary. + stats.Handle(stream.Context(), inStats) } c.trailerMD = stream.Trailer() return nil @@ -101,25 +101,25 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd } }() var ( - cbuf *bytes.Buffer - outgoingPayloadStats *stats.OutgoingPayloadStats + cbuf *bytes.Buffer + outStats *stats.OutPayload ) if compressor != nil { cbuf = new(bytes.Buffer) } if stats.On() { - outgoingPayloadStats = &stats.OutgoingPayloadStats{ - IsClient: true, + outStats = &stats.OutPayload{ + Client: true, } } - outBuf, err := encode(codec, args, compressor, cbuf, outgoingPayloadStats) + outBuf, err := encode(codec, args, compressor, cbuf, outStats) if err != nil { return nil, Errorf(codes.Internal, "grpc: %v", err) } err = t.Write(stream, outBuf, opts) - if outgoingPayloadStats != nil { - outgoingPayloadStats.SentTime = time.Now() - stats.Handle(stream.Context(), outgoingPayloadStats) + if outStats != nil { + outStats.SentTime = time.Now() + stats.Handle(stream.Context(), outStats) } // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following @@ -179,9 +179,9 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli ) defer func() { if e != nil && stats.On() { - errorStats := &stats.ErrorStats{ - IsClient: true, - Error: e, + errorStats := &stats.RPCErr{ + Client: true, + Error: e, } if stream != nil { stats.Handle(stream.Context(), errorStats) diff --git a/rpc_util.go b/rpc_util.go index f76bb30c2..909aba2b5 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -257,7 +257,7 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro // encode serializes msg and prepends the message header. If msg is nil, it // generates the message header of 0 message length. -func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outgoingPayloadStats *stats.OutgoingPayloadStats) ([]byte, error) { +func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outStats *stats.OutPayload) ([]byte, error) { var ( b []byte length uint @@ -269,10 +269,10 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outgoin if err != nil { return nil, err } - if outgoingPayloadStats != nil { - outgoingPayloadStats.Payload = msg - outgoingPayloadStats.Data = b - outgoingPayloadStats.Length = len(b) + if outStats != nil { + outStats.Payload = msg + outStats.Data = b + outStats.Length = len(b) } if cp != nil { if err := cp.Do(cbuf, b); err != nil { @@ -304,8 +304,8 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outgoin // Copy encoded msg to buf copy(buf[5:], b) - if outgoingPayloadStats != nil { - outgoingPayloadStats.WireLength = len(buf) + if outStats != nil { + outStats.WireLength = len(buf) } return buf, nil @@ -324,14 +324,14 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er return nil } -func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, incomingPayloadStats *stats.IncomingPayloadStats) error { +func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inStats *stats.InPayload) error { pf, d, err := p.recvMsg(maxMsgSize) if err != nil { return err } - if incomingPayloadStats != nil { - incomingPayloadStats.ReceivedTime = time.Now() - incomingPayloadStats.WireLength = len(d) + if inStats != nil { + inStats.RecvTime = time.Now() + inStats.WireLength = len(d) } if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil { return err @@ -350,10 +350,10 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{ if err := c.Unmarshal(d, m); err != nil { return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) } - if incomingPayloadStats != nil { - incomingPayloadStats.Payload = m - incomingPayloadStats.Data = d - incomingPayloadStats.Length = len(d) + if inStats != nil { + inStats.Payload = m + inStats.Data = d + inStats.Length = len(d) } return nil } diff --git a/server.go b/server.go index 7b3898df9..d65ce5792 100644 --- a/server.go +++ b/server.go @@ -552,16 +552,16 @@ func (s *Server) removeConn(c io.Closer) { func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error { var ( - cbuf *bytes.Buffer - outgoingPayloadStats *stats.OutgoingPayloadStats + cbuf *bytes.Buffer + outStats *stats.OutPayload ) if cp != nil { cbuf = new(bytes.Buffer) } if stats.On() { - outgoingPayloadStats = &stats.OutgoingPayloadStats{} + outStats = &stats.OutPayload{} } - p, err := encode(s.opts.codec, msg, cp, cbuf, outgoingPayloadStats) + p, err := encode(s.opts.codec, msg, cp, cbuf, outStats) if err != nil { // This typically indicates a fatal issue (e.g., memory // corruption or hardware faults) the application program @@ -573,10 +573,10 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str grpclog.Fatalf("grpc: Server failed to encode response %v", err) } err = t.Write(stream, p, opts) - if outgoingPayloadStats != nil { - outgoingPayloadStats.SentTime = time.Now() + if outStats != nil { + outStats.SentTime = time.Now() - stats.Handle(stream.Context(), outgoingPayloadStats) + stats.Handle(stream.Context(), outStats) } return err } @@ -584,7 +584,7 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { defer func() { if stats.On() && err != nil && err != io.EOF { - errorStats := &stats.ErrorStats{ + errorStats := &stats.RPCErr{ Error: toRPCErr(err), } stats.Handle(stream.Context(), errorStats) @@ -608,11 +608,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. p := &parser{r: stream} for { pf, req, err := p.recvMsg(s.opts.maxMsgSize) - var incomingPayloadStats *stats.IncomingPayloadStats + var inStats *stats.InPayload if stats.On() { - incomingPayloadStats = &stats.IncomingPayloadStats{ + inStats = &stats.InPayload{ - ReceivedTime: time.Now(), + RecvTime: time.Now(), } } if err == io.EOF { @@ -658,8 +658,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. statusCode := codes.OK statusDesc := "" df := func(v interface{}) error { - if incomingPayloadStats != nil { - incomingPayloadStats.WireLength = len(req) + if inStats != nil { + inStats.WireLength = len(req) } if pf == compressionMade { var err error @@ -680,11 +680,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if err := s.opts.codec.Unmarshal(req, v); err != nil { return err } - if incomingPayloadStats != nil { - incomingPayloadStats.Payload = v - incomingPayloadStats.Data = req - incomingPayloadStats.Length = len(req) - stats.Handle(stream.Context(), incomingPayloadStats) + if inStats != nil { + inStats.Payload = v + inStats.Data = req + inStats.Length = len(req) + stats.Handle(stream.Context(), inStats) } if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) @@ -743,7 +743,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { defer func() { if stats.On() && err != nil && err != io.EOF { - errorStats := &stats.ErrorStats{ + errorStats := &stats.RPCErr{ Error: toRPCErr(err), } stats.Handle(stream.Context(), errorStats) @@ -832,7 +832,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str } errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) if stats.On() { - errorStats := &stats.ErrorStats{ + errorStats := &stats.RPCErr{ Error: Errorf(codes.InvalidArgument, errDesc), } stats.Handle(stream.Context(), errorStats) @@ -859,7 +859,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str } errDesc := fmt.Sprintf("unknown service %v", service) if stats.On() { - errorStats := &stats.ErrorStats{ + errorStats := &stats.RPCErr{ Error: Errorf(codes.InvalidArgument, errDesc), } stats.Handle(stream.Context(), errorStats) @@ -891,7 +891,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str } errDesc := fmt.Sprintf("unknown method %v", method) if stats.On() { - errorStats := &stats.ErrorStats{ + errorStats := &stats.RPCErr{ Error: Errorf(codes.InvalidArgument, errDesc), } stats.Handle(stream.Context(), errorStats) diff --git a/stats/stats.go b/stats/stats.go index 116bfce5d..4a5327a4c 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -44,18 +44,18 @@ import ( "golang.org/x/net/context" ) -// Stats contains stats information about RPCs. +// RPCStats contains stats information about RPCs. // All stats types in this package implements this interface. -type Stats interface { +type RPCStats interface { isStats() - // ClientStats indicates if the stats is a client stats. - ClientStats() bool + // IsClient indicates if the stats is a client stats. + IsClient() bool } -// IncomingPayloadStats contains the information for a incoming payload. -type IncomingPayloadStats struct { - // IsClient indicates if this stats is a client stats. - IsClient bool +// InPayload contains the information for a incoming payload. +type InPayload struct { + // Client indicates if this stats is a client stats. + Client bool // Payload is the payload with original type. Payload interface{} // Data is the unencrypted message payload. @@ -64,20 +64,20 @@ type IncomingPayloadStats struct { Length int // WireLength is the length of data on wire (compressed, signed, encrypted). WireLength int - // ReceivedTime is the time when the payload is received. - ReceivedTime time.Time + // RecvTime is the time when the payload is received. + RecvTime time.Time } -func (s *IncomingPayloadStats) isStats() {} +func (s *InPayload) isStats() {} -// ClientStats indicates if the stats is a client stats. -func (s *IncomingPayloadStats) ClientStats() bool { return s.IsClient } +// IsClient indicates if the stats is a client stats. +func (s *InPayload) IsClient() bool { return s.Client } -// IncomingHeaderStats indicates a header is received. -// Method, addresses and Encryption are only valid if IsClient is false. -type IncomingHeaderStats struct { - // IsClient indicates if this stats is a client stats. - IsClient bool +// InHeader indicates a header is received. +// Method, addresses and Encryption are only valid if Client is false. +type InHeader struct { + // Client indicates if this stats is a client stats. + Client bool // WireLength is the wire length of header. WireLength int @@ -91,28 +91,28 @@ type IncomingHeaderStats struct { Encryption string } -func (s *IncomingHeaderStats) isStats() {} +func (s *InHeader) isStats() {} -// ClientStats indicates if the stats is a client stats. -func (s *IncomingHeaderStats) ClientStats() bool { return s.IsClient } +// IsClient indicates if the stats is a client stats. +func (s *InHeader) IsClient() bool { return s.Client } -// IncomingTrailerStats indicates a trailer is received. -type IncomingTrailerStats struct { - // IsClient indicates if this stats is a client stats. - IsClient bool +// InTrailer indicates a trailer is received. +type InTrailer struct { + // Client indicates if this stats is a client stats. + Client bool // WireLength is the wire length of header. WireLength int } -func (s *IncomingTrailerStats) isStats() {} +func (s *InTrailer) isStats() {} -// ClientStats indicates if the stats is a client stats. -func (s *IncomingTrailerStats) ClientStats() bool { return s.IsClient } +// IsClient indicates if the stats is a client stats. +func (s *InTrailer) IsClient() bool { return s.Client } -// OutgoingPayloadStats contains the information for a outgoing payload. -type OutgoingPayloadStats struct { - // IsClient indicates if this stats is a client stats. - IsClient bool +// OutPayload contains the information for a outgoing payload. +type OutPayload struct { + // Client indicates if this stats is a client stats. + Client bool // Payload is the payload with original type. Payload interface{} // Data is the unencrypted message payload. @@ -125,16 +125,16 @@ type OutgoingPayloadStats struct { SentTime time.Time } -func (s *OutgoingPayloadStats) isStats() {} +func (s *OutPayload) isStats() {} -// ClientStats indicates if the stats is a client stats. -func (s *OutgoingPayloadStats) ClientStats() bool { return s.IsClient } +// IsClient indicates if the stats is a client stats. +func (s *OutPayload) IsClient() bool { return s.Client } -// OutgoingHeaderStats indicates a header is sent. -// Method, addresses and Encryption are only valid if IsClient is true. -type OutgoingHeaderStats struct { - // IsClient indicates if this stats is a client stats. - IsClient bool +// OutHeader indicates a header is sent. +// Method, addresses and Encryption are only valid if Client is true. +type OutHeader struct { + // Client indicates if this stats is a client stats. + Client bool // WireLength is the wire length of header. WireLength int @@ -148,40 +148,40 @@ type OutgoingHeaderStats struct { Encryption string } -func (s *OutgoingHeaderStats) isStats() {} +func (s *OutHeader) isStats() {} -// ClientStats indicates if the stats is a client stats. -func (s *OutgoingHeaderStats) ClientStats() bool { return s.IsClient } +// IsClient indicates if the stats is a client stats. +func (s *OutHeader) IsClient() bool { return s.Client } -// OutgoingTrailerStats indicates a trailer is sent. -type OutgoingTrailerStats struct { - // IsClient indicates if this stats is a client stats. - IsClient bool +// OutTrailer indicates a trailer is sent. +type OutTrailer struct { + // Client indicates if this stats is a client stats. + Client bool // WireLength is the wire length of header. WireLength int } -func (s *OutgoingTrailerStats) isStats() {} +func (s *OutTrailer) isStats() {} -// ClientStats indicates if the stats is a client stats. -func (s *OutgoingTrailerStats) ClientStats() bool { return s.IsClient } +// IsClient indicates if the stats is a client stats. +func (s *OutTrailer) IsClient() bool { return s.Client } -// ErrorStats indicates an error happens. -type ErrorStats struct { - // IsClient indicates if this stats is a client stats. - IsClient bool +// RPCErr indicates an error happens. +type RPCErr struct { + // Client indicates if this stats is a client stats. + Client bool // Error is the error just happened. Its type is gRPC error. Error error } -func (s *ErrorStats) isStats() {} +func (s *RPCErr) isStats() {} -// ClientStats indicates if the stats is a client stats. -func (s *ErrorStats) ClientStats() bool { return s.IsClient } +// IsClient indicates if the stats is a client stats. +func (s *RPCErr) IsClient() bool { return s.Client } var ( on = new(int32) - handler func(context.Context, Stats) + handler func(context.Context, RPCStats) ) // On indicates whether stats is started. @@ -190,13 +190,13 @@ func On() bool { } // Handle returns the call back function registered by user to process the stats. -func Handle(ctx context.Context, s Stats) { +func Handle(ctx context.Context, s RPCStats) { handler(ctx, s) } // RegisterHandler registers the user handler function and starts the stats collection. // This handler function will be called to process the stats. -func RegisterHandler(f func(context.Context, Stats)) { +func RegisterHandler(f func(context.Context, RPCStats)) { handler = f start() } diff --git a/stats/stats_test.go b/stats/stats_test.go index db07ed6ee..ddf021e1f 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -288,7 +288,7 @@ type expectedData struct { type gotData struct { ctx context.Context client bool - s stats.Stats + s stats.RPCStats } const ( @@ -302,13 +302,13 @@ const ( errors ) -func checkIncomingHeaderStats(t *testing.T, d *gotData, e *expectedData) { +func checkInHeader(t *testing.T, d *gotData, e *expectedData) { var ( ok bool - st *stats.IncomingHeaderStats + st *stats.InHeader ) - if st, ok = d.s.(*stats.IncomingHeaderStats); !ok { - t.Fatalf("got %T, want IncomingHeaderStats", d.s) + if st, ok = d.s.(*stats.InHeader); !ok { + t.Fatalf("got %T, want InHeader", d.s) } if d.ctx == nil { t.Fatalf("d.ctx = nil, want ") @@ -330,13 +330,13 @@ func checkIncomingHeaderStats(t *testing.T, d *gotData, e *expectedData) { } } -func checkIncomingPayloadStats(t *testing.T, d *gotData, e *expectedData) { +func checkInPayload(t *testing.T, d *gotData, e *expectedData) { var ( ok bool - st *stats.IncomingPayloadStats + st *stats.InPayload ) - if st, ok = d.s.(*stats.IncomingPayloadStats); !ok { - t.Fatalf("got %T, want IncomingPayloadStats", d.s) + if st, ok = d.s.(*stats.InPayload); !ok { + t.Fatalf("got %T, want InPayload", d.s) } if d.ctx == nil { t.Fatalf("d.ctx = nil, want ") @@ -373,18 +373,18 @@ func checkIncomingPayloadStats(t *testing.T, d *gotData, e *expectedData) { } } // TODO check WireLength and ReceivedTime. - if st.ReceivedTime.IsZero() { - t.Fatalf("st.ReceivedTime = %v, want ", st.ReceivedTime) + if st.RecvTime.IsZero() { + t.Fatalf("st.ReceivedTime = %v, want ", st.RecvTime) } } -func checkIncomingTrailerStats(t *testing.T, d *gotData, e *expectedData) { +func checkInTrailer(t *testing.T, d *gotData, e *expectedData) { var ( ok bool - st *stats.IncomingTrailerStats + st *stats.InTrailer ) - if st, ok = d.s.(*stats.IncomingTrailerStats); !ok { - t.Fatalf("got %T, want IncomingTrailerStats", d.s) + if st, ok = d.s.(*stats.InTrailer); !ok { + t.Fatalf("got %T, want InTrailer", d.s) } if d.ctx == nil { t.Fatalf("d.ctx = nil, want ") @@ -395,13 +395,13 @@ func checkIncomingTrailerStats(t *testing.T, d *gotData, e *expectedData) { } } -func checkOutgoingHeaderStats(t *testing.T, d *gotData, e *expectedData) { +func checkOutHeader(t *testing.T, d *gotData, e *expectedData) { var ( ok bool - st *stats.OutgoingHeaderStats + st *stats.OutHeader ) - if st, ok = d.s.(*stats.OutgoingHeaderStats); !ok { - t.Fatalf("got %T, want OutgoingHeaderStats", d.s) + if st, ok = d.s.(*stats.OutHeader); !ok { + t.Fatalf("got %T, want OutHeader", d.s) } if d.ctx == nil { t.Fatalf("d.ctx = nil, want ") @@ -423,13 +423,13 @@ func checkOutgoingHeaderStats(t *testing.T, d *gotData, e *expectedData) { } } -func checkOutgoingPayloadStats(t *testing.T, d *gotData, e *expectedData) { +func checkOutPayload(t *testing.T, d *gotData, e *expectedData) { var ( ok bool - st *stats.OutgoingPayloadStats + st *stats.OutPayload ) - if st, ok = d.s.(*stats.OutgoingPayloadStats); !ok { - t.Fatalf("got %T, want OutgoingPayloadStats", d.s) + if st, ok = d.s.(*stats.OutPayload); !ok { + t.Fatalf("got %T, want OutPayload", d.s) } if d.ctx == nil { t.Fatalf("d.ctx = nil, want ") @@ -471,18 +471,18 @@ func checkOutgoingPayloadStats(t *testing.T, d *gotData, e *expectedData) { } } -func checkOutgoingTrailerStats(t *testing.T, d *gotData, e *expectedData) { +func checkOutTrailer(t *testing.T, d *gotData, e *expectedData) { var ( ok bool - st *stats.OutgoingTrailerStats + st *stats.OutTrailer ) - if st, ok = d.s.(*stats.OutgoingTrailerStats); !ok { - t.Fatalf("got %T, want OutgoingTrailerStats", d.s) + if st, ok = d.s.(*stats.OutTrailer); !ok { + t.Fatalf("got %T, want OutTrailer", d.s) } if d.ctx == nil { t.Fatalf("d.ctx = nil, want ") } - if st.IsClient { + if st.Client { t.Fatalf("st IsClient = true, want false") } // TODO check real length, not just > 0. @@ -494,9 +494,9 @@ func checkOutgoingTrailerStats(t *testing.T, d *gotData, e *expectedData) { func checkErrorStats(t *testing.T, d *gotData, e *expectedData) { var ( ok bool - st *stats.ErrorStats + st *stats.RPCErr ) - if st, ok = d.s.(*stats.ErrorStats); !ok { + if st, ok = d.s.(*stats.RPCErr); !ok { t.Fatalf("got %T, want ErrorStats", d.s) } if d.ctx == nil { @@ -512,10 +512,10 @@ func TestServerStatsUnaryRPC(t *testing.T) { mu sync.Mutex got []*gotData ) - stats.RegisterHandler(func(ctx context.Context, s stats.Stats) { + stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) { mu.Lock() defer mu.Unlock() - if !s.ClientStats() { + if !s.IsClient() { got = append(got, &gotData{ctx, false, s}) } }) @@ -538,11 +538,11 @@ func TestServerStatsUnaryRPC(t *testing.T) { } checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ - checkIncomingHeaderStats, - checkIncomingPayloadStats, - checkOutgoingHeaderStats, - checkOutgoingPayloadStats, - checkOutgoingTrailerStats, + checkInHeader, + checkInPayload, + checkOutHeader, + checkOutPayload, + checkOutTrailer, } if len(got) != len(checkFuncs) { @@ -563,10 +563,10 @@ func TestServerStatsUnaryRPCError(t *testing.T) { mu sync.Mutex got []*gotData ) - stats.RegisterHandler(func(ctx context.Context, s stats.Stats) { + stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) { mu.Lock() defer mu.Unlock() - if !s.ClientStats() { + if !s.IsClient() { got = append(got, &gotData{ctx, false, s}) } }) @@ -590,10 +590,10 @@ func TestServerStatsUnaryRPCError(t *testing.T) { } checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ - checkIncomingHeaderStats, - checkIncomingPayloadStats, - checkOutgoingHeaderStats, - checkOutgoingTrailerStats, + checkInHeader, + checkInPayload, + checkOutHeader, + checkOutTrailer, checkErrorStats, } @@ -615,10 +615,10 @@ func TestServerStatsStreamingRPC(t *testing.T) { mu sync.Mutex got []*gotData ) - stats.RegisterHandler(func(ctx context.Context, s stats.Stats) { + stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) { mu.Lock() defer mu.Unlock() - if !s.ClientStats() { + if !s.IsClient() { got = append(got, &gotData{ctx, false, s}) } }) @@ -643,17 +643,17 @@ func TestServerStatsStreamingRPC(t *testing.T) { } checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ - checkIncomingHeaderStats, - checkOutgoingHeaderStats, + checkInHeader, + checkOutHeader, } ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){ - checkIncomingPayloadStats, - checkOutgoingPayloadStats, + checkInPayload, + checkOutPayload, } for i := 0; i < count; i++ { checkFuncs = append(checkFuncs, ioPayFuncs...) } - checkFuncs = append(checkFuncs, checkOutgoingTrailerStats) + checkFuncs = append(checkFuncs, checkOutTrailer) if len(got) != len(checkFuncs) { t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs)) @@ -673,10 +673,10 @@ func TestServerStatsStreamingRPCError(t *testing.T) { mu sync.Mutex got []*gotData ) - stats.RegisterHandler(func(ctx context.Context, s stats.Stats) { + stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) { mu.Lock() defer mu.Unlock() - if !s.ClientStats() { + if !s.IsClient() { got = append(got, &gotData{ctx, false, s}) } }) @@ -702,10 +702,10 @@ func TestServerStatsStreamingRPCError(t *testing.T) { } checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ - checkIncomingHeaderStats, - checkOutgoingHeaderStats, - checkIncomingPayloadStats, - checkOutgoingTrailerStats, + checkInHeader, + checkOutHeader, + checkInPayload, + checkOutTrailer, checkErrorStats, } @@ -732,10 +732,10 @@ func TestClientStatsUnaryRPC(t *testing.T) { mu sync.Mutex got []*gotData ) - stats.RegisterHandler(func(ctx context.Context, s stats.Stats) { + stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) { mu.Lock() defer mu.Unlock() - if s.ClientStats() { + if s.IsClient() { got = append(got, &gotData{ctx, true, s}) } }) @@ -758,11 +758,11 @@ func TestClientStatsUnaryRPC(t *testing.T) { } checkFuncs := map[int]*checkFuncWithCount{ - outheader: &checkFuncWithCount{checkOutgoingHeaderStats, 1}, - outpay: &checkFuncWithCount{checkOutgoingPayloadStats, 1}, - inheader: &checkFuncWithCount{checkIncomingHeaderStats, 1}, - inpay: &checkFuncWithCount{checkIncomingPayloadStats, 1}, - intrailer: &checkFuncWithCount{checkIncomingTrailerStats, 1}, + outheader: &checkFuncWithCount{checkOutHeader, 1}, + outpay: &checkFuncWithCount{checkOutPayload, 1}, + inheader: &checkFuncWithCount{checkInHeader, 1}, + inpay: &checkFuncWithCount{checkInPayload, 1}, + intrailer: &checkFuncWithCount{checkInTrailer, 1}, } var expectLen int @@ -776,31 +776,31 @@ func TestClientStatsUnaryRPC(t *testing.T) { for _, s := range got { mu.Lock() switch s.s.(type) { - case *stats.OutgoingHeaderStats: + case *stats.OutHeader: if checkFuncs[outheader].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[outheader].f(t, s, expect) checkFuncs[outheader].c-- - case *stats.OutgoingPayloadStats: + case *stats.OutPayload: if checkFuncs[outpay].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[outpay].f(t, s, expect) checkFuncs[outpay].c-- - case *stats.IncomingHeaderStats: + case *stats.InHeader: if checkFuncs[inheader].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[inheader].f(t, s, expect) checkFuncs[inheader].c-- - case *stats.IncomingPayloadStats: + case *stats.InPayload: if checkFuncs[inpay].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[inpay].f(t, s, expect) checkFuncs[inpay].c-- - case *stats.IncomingTrailerStats: + case *stats.InTrailer: if checkFuncs[intrailer].c <= 0 { t.Fatalf("unexpected stats: %T", s) } @@ -820,10 +820,10 @@ func TestClientStatsUnaryRPCError(t *testing.T) { mu sync.Mutex got []*gotData ) - stats.RegisterHandler(func(ctx context.Context, s stats.Stats) { + stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) { mu.Lock() defer mu.Unlock() - if s.ClientStats() { + if s.IsClient() { got = append(got, &gotData{ctx, true, s}) } }) @@ -847,10 +847,10 @@ func TestClientStatsUnaryRPCError(t *testing.T) { } checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ - checkOutgoingHeaderStats, - checkOutgoingPayloadStats, - checkIncomingHeaderStats, - checkIncomingTrailerStats, + checkOutHeader, + checkOutPayload, + checkInHeader, + checkInTrailer, checkErrorStats, } @@ -872,10 +872,10 @@ func TestClientStatsStreamingRPC(t *testing.T) { mu sync.Mutex got []*gotData ) - stats.RegisterHandler(func(ctx context.Context, s stats.Stats) { + stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) { mu.Lock() defer mu.Unlock() - if s.ClientStats() { + if s.IsClient() { got = append(got, &gotData{ctx, true, s}) } }) @@ -900,11 +900,11 @@ func TestClientStatsStreamingRPC(t *testing.T) { } checkFuncs := map[int]*checkFuncWithCount{ - outheader: &checkFuncWithCount{checkOutgoingHeaderStats, 1}, - outpay: &checkFuncWithCount{checkOutgoingPayloadStats, count}, - inheader: &checkFuncWithCount{checkIncomingHeaderStats, 1}, - inpay: &checkFuncWithCount{checkIncomingPayloadStats, count}, - intrailer: &checkFuncWithCount{checkIncomingTrailerStats, 1}, + outheader: &checkFuncWithCount{checkOutHeader, 1}, + outpay: &checkFuncWithCount{checkOutPayload, count}, + inheader: &checkFuncWithCount{checkInHeader, 1}, + inpay: &checkFuncWithCount{checkInPayload, count}, + intrailer: &checkFuncWithCount{checkInTrailer, 1}, } var expectLen int @@ -918,31 +918,31 @@ func TestClientStatsStreamingRPC(t *testing.T) { for _, s := range got { mu.Lock() switch s.s.(type) { - case *stats.OutgoingHeaderStats: + case *stats.OutHeader: if checkFuncs[outheader].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[outheader].f(t, s, expect) checkFuncs[outheader].c-- - case *stats.OutgoingPayloadStats: + case *stats.OutPayload: if checkFuncs[outpay].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[outpay].f(t, s, expect) checkFuncs[outpay].c-- - case *stats.IncomingHeaderStats: + case *stats.InHeader: if checkFuncs[inheader].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[inheader].f(t, s, expect) checkFuncs[inheader].c-- - case *stats.IncomingPayloadStats: + case *stats.InPayload: if checkFuncs[inpay].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[inpay].f(t, s, expect) checkFuncs[inpay].c-- - case *stats.IncomingTrailerStats: + case *stats.InTrailer: if checkFuncs[intrailer].c <= 0 { t.Fatalf("unexpected stats: %T", s) } @@ -962,10 +962,10 @@ func TestClientStatsStreamingRPCError(t *testing.T) { mu sync.Mutex got []*gotData ) - stats.RegisterHandler(func(ctx context.Context, s stats.Stats) { + stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) { mu.Lock() defer mu.Unlock() - if s.ClientStats() { + if s.IsClient() { got = append(got, &gotData{ctx, true, s}) } }) @@ -991,10 +991,10 @@ func TestClientStatsStreamingRPCError(t *testing.T) { } checkFuncs := map[int]*checkFuncWithCount{ - outheader: &checkFuncWithCount{checkOutgoingHeaderStats, 1}, - outpay: &checkFuncWithCount{checkOutgoingPayloadStats, 1}, - inheader: &checkFuncWithCount{checkIncomingHeaderStats, 1}, - intrailer: &checkFuncWithCount{checkIncomingTrailerStats, 1}, + outheader: &checkFuncWithCount{checkOutHeader, 1}, + outpay: &checkFuncWithCount{checkOutPayload, 1}, + inheader: &checkFuncWithCount{checkInHeader, 1}, + intrailer: &checkFuncWithCount{checkInTrailer, 1}, errors: &checkFuncWithCount{checkErrorStats, 1}, } @@ -1009,37 +1009,37 @@ func TestClientStatsStreamingRPCError(t *testing.T) { for _, s := range got { mu.Lock() switch s.s.(type) { - case *stats.OutgoingHeaderStats: + case *stats.OutHeader: if checkFuncs[outheader].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[outheader].f(t, s, expect) checkFuncs[outheader].c-- - case *stats.OutgoingPayloadStats: + case *stats.OutPayload: if checkFuncs[outpay].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[outpay].f(t, s, expect) checkFuncs[outpay].c-- - case *stats.IncomingHeaderStats: + case *stats.InHeader: if checkFuncs[inheader].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[inheader].f(t, s, expect) checkFuncs[inheader].c-- - case *stats.IncomingPayloadStats: + case *stats.InPayload: if checkFuncs[inpay].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[inpay].f(t, s, expect) checkFuncs[inpay].c-- - case *stats.IncomingTrailerStats: + case *stats.InTrailer: if checkFuncs[intrailer].c <= 0 { t.Fatalf("unexpected stats: %T", s) } checkFuncs[intrailer].f(t, s, expect) checkFuncs[intrailer].c-- - case *stats.ErrorStats: + case *stats.RPCErr: if checkFuncs[errors].c <= 0 { t.Fatalf("unexpected stats: %T", s) } diff --git a/stream.go b/stream.go index b1fc9634d..a1d03c4f1 100644 --- a/stream.go +++ b/stream.go @@ -101,9 +101,9 @@ type ClientStream interface { func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { defer func() { if err != nil && stats.On() { - errorStats := &stats.ErrorStats{ - IsClient: true, - Error: err, + errorStats := &stats.RPCErr{ + Client: true, + Error: err, } stats.Handle(ctx, errorStats) } @@ -265,9 +265,9 @@ func (cs *clientStream) Context() context.Context { func (cs *clientStream) Header() (_ metadata.MD, err error) { defer func() { if err != nil && stats.On() { - errorStats := &stats.ErrorStats{ - IsClient: true, - Error: err, + errorStats := &stats.RPCErr{ + Client: true, + Error: err, } stats.Handle(cs.s.Context(), errorStats) } @@ -295,9 +295,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } defer func() { if err != nil && stats.On() { - errorStats := &stats.ErrorStats{ - IsClient: true, - Error: err, + errorStats := &stats.RPCErr{ + Client: true, + Error: err, } stats.Handle(cs.s.Context(), errorStats) } @@ -324,13 +324,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } err = toRPCErr(err) }() - var outgoingPayloadStats *stats.OutgoingPayloadStats + var outStats *stats.OutPayload if stats.On() { - outgoingPayloadStats = &stats.OutgoingPayloadStats{ - IsClient: true, + outStats = &stats.OutPayload{ + Client: true, } } - out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outgoingPayloadStats) + out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outStats) defer func() { if cs.cbuf != nil { cs.cbuf.Reset() @@ -340,9 +340,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { return Errorf(codes.Internal, "grpc: %v", err) } err = cs.t.Write(cs.s, out, &transport.Options{Last: false}) - if outgoingPayloadStats != nil { - outgoingPayloadStats.SentTime = time.Now() - stats.Handle(cs.s.Context(), outgoingPayloadStats) + if outStats != nil { + outStats.SentTime = time.Now() + stats.Handle(cs.s.Context(), outStats) } return err } @@ -350,20 +350,20 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { func (cs *clientStream) RecvMsg(m interface{}) (err error) { defer func() { if err != nil && err != io.EOF && stats.On() { - errorStats := &stats.ErrorStats{ - IsClient: true, - Error: err, + errorStats := &stats.RPCErr{ + Client: true, + Error: err, } stats.Handle(cs.s.Context(), errorStats) } }() - var incomingPayloadStats *stats.IncomingPayloadStats + var inStats *stats.InPayload if stats.On() { - incomingPayloadStats = &stats.IncomingPayloadStats{ - IsClient: true, + inStats = &stats.InPayload{ + Client: true, } } - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, incomingPayloadStats) + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inStats) defer func() { // err != nil indicates the termination of the stream. if err != nil { @@ -378,14 +378,14 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { } cs.mu.Unlock() } - if incomingPayloadStats != nil { - stats.Handle(cs.s.Context(), incomingPayloadStats) + if inStats != nil { + stats.Handle(cs.s.Context(), inStats) } if !cs.desc.ClientStreams || cs.desc.ServerStreams { return } // Special handling for client streaming rpc. - // This recv expects EOF or errors, so we don't collect incomingPayloadStats. + // This recv expects EOF or errors, so we don't collect inStats. err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil) cs.closeTransportStream(err) if err == nil { @@ -540,11 +540,11 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { ss.mu.Unlock() } }() - var outgoingPayloadStats *stats.OutgoingPayloadStats + var outStats *stats.OutPayload if stats.On() { - outgoingPayloadStats = &stats.OutgoingPayloadStats{} + outStats = &stats.OutPayload{} } - out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outgoingPayloadStats) + out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outStats) defer func() { if ss.cbuf != nil { ss.cbuf.Reset() @@ -557,9 +557,9 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil { return toRPCErr(err) } - if outgoingPayloadStats != nil { - outgoingPayloadStats.SentTime = time.Now() - stats.Handle(ss.s.Context(), outgoingPayloadStats) + if outStats != nil { + outStats.SentTime = time.Now() + stats.Handle(ss.s.Context(), outStats) } return nil } @@ -579,11 +579,11 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { ss.mu.Unlock() } }() - var incomingPayloadStats *stats.IncomingPayloadStats + var inStats *stats.InPayload if stats.On() { - incomingPayloadStats = &stats.IncomingPayloadStats{} + inStats = &stats.InPayload{} } - if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, incomingPayloadStats); err != nil { + if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inStats); err != nil { if err == io.EOF { return err } @@ -592,8 +592,8 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { } return toRPCErr(err) } - if incomingPayloadStats != nil { - stats.Handle(ss.s.Context(), incomingPayloadStats) + if inStats != nil { + stats.Handle(ss.s.Context(), inStats) } return nil } diff --git a/transport/http2_client.go b/transport/http2_client.go index c663bdbe3..d7e588657 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -450,15 +450,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } } if stats.On() { - outgoingHeaderStats := &stats.OutgoingHeaderStats{ - IsClient: true, + outHeader := &stats.OutHeader{ + Client: true, WireLength: bufLen, Method: callHdr.Method, RemoteAddr: t.RemoteAddr(), LocalAddr: t.LocalAddr(), Encryption: callHdr.SendCompress, } - stats.Handle(s.Context(), outgoingHeaderStats) + stats.Handle(s.Context(), outHeader) } t.writableChan <- 0 return s, nil @@ -891,17 +891,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { defer func() { if stats.On() { if isHeader { - incomingHeaderStats := &stats.IncomingHeaderStats{ - IsClient: true, + inHeader := &stats.InHeader{ + Client: true, WireLength: int(frame.Header().Length), } - stats.Handle(s.ctx, incomingHeaderStats) + stats.Handle(s.ctx, inHeader) } else { - incomingTrailerStats := &stats.IncomingTrailerStats{ - IsClient: true, + inTrailer := &stats.InTrailer{ + Client: true, WireLength: int(frame.Header().Length), } - stats.Handle(s.ctx, incomingTrailerStats) + stats.Handle(s.ctx, inTrailer) } } }() diff --git a/transport/http2_server.go b/transport/http2_server.go index f3eb0f758..a38a8a3a5 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -236,14 +236,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( t.updateWindow(s, uint32(n)) } if stats.On() { - incomingHeaderStats := &stats.IncomingHeaderStats{ + inHeader := &stats.InHeader{ Method: s.method, RemoteAddr: t.conn.RemoteAddr(), LocalAddr: t.conn.LocalAddr(), Encryption: s.recvCompress, WireLength: int(frame.Header().Length), } - stats.Handle(s.ctx, incomingHeaderStats) + stats.Handle(s.ctx, inHeader) } handle(s) return @@ -524,10 +524,10 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { return err } if stats.On() { - outgoingHeaderStats := &stats.OutgoingHeaderStats{ + outHeader := &stats.OutHeader{ WireLength: bufLen, } - stats.Handle(s.Context(), outgoingHeaderStats) + stats.Handle(s.Context(), outHeader) } t.writableChan <- 0 return nil @@ -587,10 +587,10 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s return err } if stats.On() { - outgoingTrailerStats := &stats.OutgoingTrailerStats{ + outTrailer := &stats.OutTrailer{ WireLength: bufLen, } - stats.Handle(s.Context(), outgoingTrailerStats) + stats.Handle(s.Context(), outTrailer) } t.closeStream(s) t.writableChan <- 0