Renaming types and variables

This commit is contained in:
Menghan Li 2016-11-02 15:45:22 -07:00
parent e42a66c81b
commit c698588285
8 changed files with 265 additions and 265 deletions

36
call.go
View File

@ -64,23 +64,23 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
return
}
p := &parser{r: stream}
var incomingPayloadStats *stats.IncomingPayloadStats
var inStats *stats.InPayload
if stats.On() {
incomingPayloadStats = &stats.IncomingPayloadStats{
IsClient: true,
inStats = &stats.InPayload{
Client: true,
}
}
for {
if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, incomingPayloadStats); err != nil {
if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inStats); err != nil {
if err == io.EOF {
break
}
return
}
}
if err == io.EOF && stream.StatusCode() == codes.OK && incomingPayloadStats != nil {
// TODO in the current implementation, incomingTrailerStats is handled before incomingPayloadStats. Fix the order if necessary.
stats.Handle(stream.Context(), incomingPayloadStats)
if err == io.EOF && stream.StatusCode() == codes.OK && inStats != nil {
// TODO in the current implementation, inTrailer is handled before inStats. Fix the order if necessary.
stats.Handle(stream.Context(), inStats)
}
c.trailerMD = stream.Trailer()
return nil
@ -101,25 +101,25 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
}
}()
var (
cbuf *bytes.Buffer
outgoingPayloadStats *stats.OutgoingPayloadStats
cbuf *bytes.Buffer
outStats *stats.OutPayload
)
if compressor != nil {
cbuf = new(bytes.Buffer)
}
if stats.On() {
outgoingPayloadStats = &stats.OutgoingPayloadStats{
IsClient: true,
outStats = &stats.OutPayload{
Client: true,
}
}
outBuf, err := encode(codec, args, compressor, cbuf, outgoingPayloadStats)
outBuf, err := encode(codec, args, compressor, cbuf, outStats)
if err != nil {
return nil, Errorf(codes.Internal, "grpc: %v", err)
}
err = t.Write(stream, outBuf, opts)
if outgoingPayloadStats != nil {
outgoingPayloadStats.SentTime = time.Now()
stats.Handle(stream.Context(), outgoingPayloadStats)
if outStats != nil {
outStats.SentTime = time.Now()
stats.Handle(stream.Context(), outStats)
}
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
@ -179,9 +179,9 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
)
defer func() {
if e != nil && stats.On() {
errorStats := &stats.ErrorStats{
IsClient: true,
Error: e,
errorStats := &stats.RPCErr{
Client: true,
Error: e,
}
if stream != nil {
stats.Handle(stream.Context(), errorStats)

View File

@ -257,7 +257,7 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro
// encode serializes msg and prepends the message header. If msg is nil, it
// generates the message header of 0 message length.
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outgoingPayloadStats *stats.OutgoingPayloadStats) ([]byte, error) {
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outStats *stats.OutPayload) ([]byte, error) {
var (
b []byte
length uint
@ -269,10 +269,10 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outgoin
if err != nil {
return nil, err
}
if outgoingPayloadStats != nil {
outgoingPayloadStats.Payload = msg
outgoingPayloadStats.Data = b
outgoingPayloadStats.Length = len(b)
if outStats != nil {
outStats.Payload = msg
outStats.Data = b
outStats.Length = len(b)
}
if cp != nil {
if err := cp.Do(cbuf, b); err != nil {
@ -304,8 +304,8 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outgoin
// Copy encoded msg to buf
copy(buf[5:], b)
if outgoingPayloadStats != nil {
outgoingPayloadStats.WireLength = len(buf)
if outStats != nil {
outStats.WireLength = len(buf)
}
return buf, nil
@ -324,14 +324,14 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
return nil
}
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, incomingPayloadStats *stats.IncomingPayloadStats) error {
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inStats *stats.InPayload) error {
pf, d, err := p.recvMsg(maxMsgSize)
if err != nil {
return err
}
if incomingPayloadStats != nil {
incomingPayloadStats.ReceivedTime = time.Now()
incomingPayloadStats.WireLength = len(d)
if inStats != nil {
inStats.RecvTime = time.Now()
inStats.WireLength = len(d)
}
if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
return err
@ -350,10 +350,10 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
if err := c.Unmarshal(d, m); err != nil {
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
if incomingPayloadStats != nil {
incomingPayloadStats.Payload = m
incomingPayloadStats.Data = d
incomingPayloadStats.Length = len(d)
if inStats != nil {
inStats.Payload = m
inStats.Data = d
inStats.Length = len(d)
}
return nil
}

View File

@ -552,16 +552,16 @@ func (s *Server) removeConn(c io.Closer) {
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
var (
cbuf *bytes.Buffer
outgoingPayloadStats *stats.OutgoingPayloadStats
cbuf *bytes.Buffer
outStats *stats.OutPayload
)
if cp != nil {
cbuf = new(bytes.Buffer)
}
if stats.On() {
outgoingPayloadStats = &stats.OutgoingPayloadStats{}
outStats = &stats.OutPayload{}
}
p, err := encode(s.opts.codec, msg, cp, cbuf, outgoingPayloadStats)
p, err := encode(s.opts.codec, msg, cp, cbuf, outStats)
if err != nil {
// This typically indicates a fatal issue (e.g., memory
// corruption or hardware faults) the application program
@ -573,10 +573,10 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
grpclog.Fatalf("grpc: Server failed to encode response %v", err)
}
err = t.Write(stream, p, opts)
if outgoingPayloadStats != nil {
outgoingPayloadStats.SentTime = time.Now()
if outStats != nil {
outStats.SentTime = time.Now()
stats.Handle(stream.Context(), outgoingPayloadStats)
stats.Handle(stream.Context(), outStats)
}
return err
}
@ -584,7 +584,7 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
defer func() {
if stats.On() && err != nil && err != io.EOF {
errorStats := &stats.ErrorStats{
errorStats := &stats.RPCErr{
Error: toRPCErr(err),
}
stats.Handle(stream.Context(), errorStats)
@ -608,11 +608,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
p := &parser{r: stream}
for {
pf, req, err := p.recvMsg(s.opts.maxMsgSize)
var incomingPayloadStats *stats.IncomingPayloadStats
var inStats *stats.InPayload
if stats.On() {
incomingPayloadStats = &stats.IncomingPayloadStats{
inStats = &stats.InPayload{
ReceivedTime: time.Now(),
RecvTime: time.Now(),
}
}
if err == io.EOF {
@ -658,8 +658,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
statusCode := codes.OK
statusDesc := ""
df := func(v interface{}) error {
if incomingPayloadStats != nil {
incomingPayloadStats.WireLength = len(req)
if inStats != nil {
inStats.WireLength = len(req)
}
if pf == compressionMade {
var err error
@ -680,11 +680,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return err
}
if incomingPayloadStats != nil {
incomingPayloadStats.Payload = v
incomingPayloadStats.Data = req
incomingPayloadStats.Length = len(req)
stats.Handle(stream.Context(), incomingPayloadStats)
if inStats != nil {
inStats.Payload = v
inStats.Data = req
inStats.Length = len(req)
stats.Handle(stream.Context(), inStats)
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
@ -743,7 +743,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
defer func() {
if stats.On() && err != nil && err != io.EOF {
errorStats := &stats.ErrorStats{
errorStats := &stats.RPCErr{
Error: toRPCErr(err),
}
stats.Handle(stream.Context(), errorStats)
@ -832,7 +832,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if stats.On() {
errorStats := &stats.ErrorStats{
errorStats := &stats.RPCErr{
Error: Errorf(codes.InvalidArgument, errDesc),
}
stats.Handle(stream.Context(), errorStats)
@ -859,7 +859,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
}
errDesc := fmt.Sprintf("unknown service %v", service)
if stats.On() {
errorStats := &stats.ErrorStats{
errorStats := &stats.RPCErr{
Error: Errorf(codes.InvalidArgument, errDesc),
}
stats.Handle(stream.Context(), errorStats)
@ -891,7 +891,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
}
errDesc := fmt.Sprintf("unknown method %v", method)
if stats.On() {
errorStats := &stats.ErrorStats{
errorStats := &stats.RPCErr{
Error: Errorf(codes.InvalidArgument, errDesc),
}
stats.Handle(stream.Context(), errorStats)

View File

@ -44,18 +44,18 @@ import (
"golang.org/x/net/context"
)
// Stats contains stats information about RPCs.
// RPCStats contains stats information about RPCs.
// All stats types in this package implements this interface.
type Stats interface {
type RPCStats interface {
isStats()
// ClientStats indicates if the stats is a client stats.
ClientStats() bool
// IsClient indicates if the stats is a client stats.
IsClient() bool
}
// IncomingPayloadStats contains the information for a incoming payload.
type IncomingPayloadStats struct {
// IsClient indicates if this stats is a client stats.
IsClient bool
// InPayload contains the information for a incoming payload.
type InPayload struct {
// Client indicates if this stats is a client stats.
Client bool
// Payload is the payload with original type.
Payload interface{}
// Data is the unencrypted message payload.
@ -64,20 +64,20 @@ type IncomingPayloadStats struct {
Length int
// WireLength is the length of data on wire (compressed, signed, encrypted).
WireLength int
// ReceivedTime is the time when the payload is received.
ReceivedTime time.Time
// RecvTime is the time when the payload is received.
RecvTime time.Time
}
func (s *IncomingPayloadStats) isStats() {}
func (s *InPayload) isStats() {}
// ClientStats indicates if the stats is a client stats.
func (s *IncomingPayloadStats) ClientStats() bool { return s.IsClient }
// IsClient indicates if the stats is a client stats.
func (s *InPayload) IsClient() bool { return s.Client }
// IncomingHeaderStats indicates a header is received.
// Method, addresses and Encryption are only valid if IsClient is false.
type IncomingHeaderStats struct {
// IsClient indicates if this stats is a client stats.
IsClient bool
// InHeader indicates a header is received.
// Method, addresses and Encryption are only valid if Client is false.
type InHeader struct {
// Client indicates if this stats is a client stats.
Client bool
// WireLength is the wire length of header.
WireLength int
@ -91,28 +91,28 @@ type IncomingHeaderStats struct {
Encryption string
}
func (s *IncomingHeaderStats) isStats() {}
func (s *InHeader) isStats() {}
// ClientStats indicates if the stats is a client stats.
func (s *IncomingHeaderStats) ClientStats() bool { return s.IsClient }
// IsClient indicates if the stats is a client stats.
func (s *InHeader) IsClient() bool { return s.Client }
// IncomingTrailerStats indicates a trailer is received.
type IncomingTrailerStats struct {
// IsClient indicates if this stats is a client stats.
IsClient bool
// InTrailer indicates a trailer is received.
type InTrailer struct {
// Client indicates if this stats is a client stats.
Client bool
// WireLength is the wire length of header.
WireLength int
}
func (s *IncomingTrailerStats) isStats() {}
func (s *InTrailer) isStats() {}
// ClientStats indicates if the stats is a client stats.
func (s *IncomingTrailerStats) ClientStats() bool { return s.IsClient }
// IsClient indicates if the stats is a client stats.
func (s *InTrailer) IsClient() bool { return s.Client }
// OutgoingPayloadStats contains the information for a outgoing payload.
type OutgoingPayloadStats struct {
// IsClient indicates if this stats is a client stats.
IsClient bool
// OutPayload contains the information for a outgoing payload.
type OutPayload struct {
// Client indicates if this stats is a client stats.
Client bool
// Payload is the payload with original type.
Payload interface{}
// Data is the unencrypted message payload.
@ -125,16 +125,16 @@ type OutgoingPayloadStats struct {
SentTime time.Time
}
func (s *OutgoingPayloadStats) isStats() {}
func (s *OutPayload) isStats() {}
// ClientStats indicates if the stats is a client stats.
func (s *OutgoingPayloadStats) ClientStats() bool { return s.IsClient }
// IsClient indicates if the stats is a client stats.
func (s *OutPayload) IsClient() bool { return s.Client }
// OutgoingHeaderStats indicates a header is sent.
// Method, addresses and Encryption are only valid if IsClient is true.
type OutgoingHeaderStats struct {
// IsClient indicates if this stats is a client stats.
IsClient bool
// OutHeader indicates a header is sent.
// Method, addresses and Encryption are only valid if Client is true.
type OutHeader struct {
// Client indicates if this stats is a client stats.
Client bool
// WireLength is the wire length of header.
WireLength int
@ -148,40 +148,40 @@ type OutgoingHeaderStats struct {
Encryption string
}
func (s *OutgoingHeaderStats) isStats() {}
func (s *OutHeader) isStats() {}
// ClientStats indicates if the stats is a client stats.
func (s *OutgoingHeaderStats) ClientStats() bool { return s.IsClient }
// IsClient indicates if the stats is a client stats.
func (s *OutHeader) IsClient() bool { return s.Client }
// OutgoingTrailerStats indicates a trailer is sent.
type OutgoingTrailerStats struct {
// IsClient indicates if this stats is a client stats.
IsClient bool
// OutTrailer indicates a trailer is sent.
type OutTrailer struct {
// Client indicates if this stats is a client stats.
Client bool
// WireLength is the wire length of header.
WireLength int
}
func (s *OutgoingTrailerStats) isStats() {}
func (s *OutTrailer) isStats() {}
// ClientStats indicates if the stats is a client stats.
func (s *OutgoingTrailerStats) ClientStats() bool { return s.IsClient }
// IsClient indicates if the stats is a client stats.
func (s *OutTrailer) IsClient() bool { return s.Client }
// ErrorStats indicates an error happens.
type ErrorStats struct {
// IsClient indicates if this stats is a client stats.
IsClient bool
// RPCErr indicates an error happens.
type RPCErr struct {
// Client indicates if this stats is a client stats.
Client bool
// Error is the error just happened. Its type is gRPC error.
Error error
}
func (s *ErrorStats) isStats() {}
func (s *RPCErr) isStats() {}
// ClientStats indicates if the stats is a client stats.
func (s *ErrorStats) ClientStats() bool { return s.IsClient }
// IsClient indicates if the stats is a client stats.
func (s *RPCErr) IsClient() bool { return s.Client }
var (
on = new(int32)
handler func(context.Context, Stats)
handler func(context.Context, RPCStats)
)
// On indicates whether stats is started.
@ -190,13 +190,13 @@ func On() bool {
}
// Handle returns the call back function registered by user to process the stats.
func Handle(ctx context.Context, s Stats) {
func Handle(ctx context.Context, s RPCStats) {
handler(ctx, s)
}
// RegisterHandler registers the user handler function and starts the stats collection.
// This handler function will be called to process the stats.
func RegisterHandler(f func(context.Context, Stats)) {
func RegisterHandler(f func(context.Context, RPCStats)) {
handler = f
start()
}

View File

@ -288,7 +288,7 @@ type expectedData struct {
type gotData struct {
ctx context.Context
client bool
s stats.Stats
s stats.RPCStats
}
const (
@ -302,13 +302,13 @@ const (
errors
)
func checkIncomingHeaderStats(t *testing.T, d *gotData, e *expectedData) {
func checkInHeader(t *testing.T, d *gotData, e *expectedData) {
var (
ok bool
st *stats.IncomingHeaderStats
st *stats.InHeader
)
if st, ok = d.s.(*stats.IncomingHeaderStats); !ok {
t.Fatalf("got %T, want IncomingHeaderStats", d.s)
if st, ok = d.s.(*stats.InHeader); !ok {
t.Fatalf("got %T, want InHeader", d.s)
}
if d.ctx == nil {
t.Fatalf("d.ctx = nil, want <non-nil>")
@ -330,13 +330,13 @@ func checkIncomingHeaderStats(t *testing.T, d *gotData, e *expectedData) {
}
}
func checkIncomingPayloadStats(t *testing.T, d *gotData, e *expectedData) {
func checkInPayload(t *testing.T, d *gotData, e *expectedData) {
var (
ok bool
st *stats.IncomingPayloadStats
st *stats.InPayload
)
if st, ok = d.s.(*stats.IncomingPayloadStats); !ok {
t.Fatalf("got %T, want IncomingPayloadStats", d.s)
if st, ok = d.s.(*stats.InPayload); !ok {
t.Fatalf("got %T, want InPayload", d.s)
}
if d.ctx == nil {
t.Fatalf("d.ctx = nil, want <non-nil>")
@ -373,18 +373,18 @@ func checkIncomingPayloadStats(t *testing.T, d *gotData, e *expectedData) {
}
}
// TODO check WireLength and ReceivedTime.
if st.ReceivedTime.IsZero() {
t.Fatalf("st.ReceivedTime = %v, want <non-zero>", st.ReceivedTime)
if st.RecvTime.IsZero() {
t.Fatalf("st.ReceivedTime = %v, want <non-zero>", st.RecvTime)
}
}
func checkIncomingTrailerStats(t *testing.T, d *gotData, e *expectedData) {
func checkInTrailer(t *testing.T, d *gotData, e *expectedData) {
var (
ok bool
st *stats.IncomingTrailerStats
st *stats.InTrailer
)
if st, ok = d.s.(*stats.IncomingTrailerStats); !ok {
t.Fatalf("got %T, want IncomingTrailerStats", d.s)
if st, ok = d.s.(*stats.InTrailer); !ok {
t.Fatalf("got %T, want InTrailer", d.s)
}
if d.ctx == nil {
t.Fatalf("d.ctx = nil, want <non-nil>")
@ -395,13 +395,13 @@ func checkIncomingTrailerStats(t *testing.T, d *gotData, e *expectedData) {
}
}
func checkOutgoingHeaderStats(t *testing.T, d *gotData, e *expectedData) {
func checkOutHeader(t *testing.T, d *gotData, e *expectedData) {
var (
ok bool
st *stats.OutgoingHeaderStats
st *stats.OutHeader
)
if st, ok = d.s.(*stats.OutgoingHeaderStats); !ok {
t.Fatalf("got %T, want OutgoingHeaderStats", d.s)
if st, ok = d.s.(*stats.OutHeader); !ok {
t.Fatalf("got %T, want OutHeader", d.s)
}
if d.ctx == nil {
t.Fatalf("d.ctx = nil, want <non-nil>")
@ -423,13 +423,13 @@ func checkOutgoingHeaderStats(t *testing.T, d *gotData, e *expectedData) {
}
}
func checkOutgoingPayloadStats(t *testing.T, d *gotData, e *expectedData) {
func checkOutPayload(t *testing.T, d *gotData, e *expectedData) {
var (
ok bool
st *stats.OutgoingPayloadStats
st *stats.OutPayload
)
if st, ok = d.s.(*stats.OutgoingPayloadStats); !ok {
t.Fatalf("got %T, want OutgoingPayloadStats", d.s)
if st, ok = d.s.(*stats.OutPayload); !ok {
t.Fatalf("got %T, want OutPayload", d.s)
}
if d.ctx == nil {
t.Fatalf("d.ctx = nil, want <non-nil>")
@ -471,18 +471,18 @@ func checkOutgoingPayloadStats(t *testing.T, d *gotData, e *expectedData) {
}
}
func checkOutgoingTrailerStats(t *testing.T, d *gotData, e *expectedData) {
func checkOutTrailer(t *testing.T, d *gotData, e *expectedData) {
var (
ok bool
st *stats.OutgoingTrailerStats
st *stats.OutTrailer
)
if st, ok = d.s.(*stats.OutgoingTrailerStats); !ok {
t.Fatalf("got %T, want OutgoingTrailerStats", d.s)
if st, ok = d.s.(*stats.OutTrailer); !ok {
t.Fatalf("got %T, want OutTrailer", d.s)
}
if d.ctx == nil {
t.Fatalf("d.ctx = nil, want <non-nil>")
}
if st.IsClient {
if st.Client {
t.Fatalf("st IsClient = true, want false")
}
// TODO check real length, not just > 0.
@ -494,9 +494,9 @@ func checkOutgoingTrailerStats(t *testing.T, d *gotData, e *expectedData) {
func checkErrorStats(t *testing.T, d *gotData, e *expectedData) {
var (
ok bool
st *stats.ErrorStats
st *stats.RPCErr
)
if st, ok = d.s.(*stats.ErrorStats); !ok {
if st, ok = d.s.(*stats.RPCErr); !ok {
t.Fatalf("got %T, want ErrorStats", d.s)
}
if d.ctx == nil {
@ -512,10 +512,10 @@ func TestServerStatsUnaryRPC(t *testing.T) {
mu sync.Mutex
got []*gotData
)
stats.RegisterHandler(func(ctx context.Context, s stats.Stats) {
stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if !s.ClientStats() {
if !s.IsClient() {
got = append(got, &gotData{ctx, false, s})
}
})
@ -538,11 +538,11 @@ func TestServerStatsUnaryRPC(t *testing.T) {
}
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkIncomingHeaderStats,
checkIncomingPayloadStats,
checkOutgoingHeaderStats,
checkOutgoingPayloadStats,
checkOutgoingTrailerStats,
checkInHeader,
checkInPayload,
checkOutHeader,
checkOutPayload,
checkOutTrailer,
}
if len(got) != len(checkFuncs) {
@ -563,10 +563,10 @@ func TestServerStatsUnaryRPCError(t *testing.T) {
mu sync.Mutex
got []*gotData
)
stats.RegisterHandler(func(ctx context.Context, s stats.Stats) {
stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if !s.ClientStats() {
if !s.IsClient() {
got = append(got, &gotData{ctx, false, s})
}
})
@ -590,10 +590,10 @@ func TestServerStatsUnaryRPCError(t *testing.T) {
}
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkIncomingHeaderStats,
checkIncomingPayloadStats,
checkOutgoingHeaderStats,
checkOutgoingTrailerStats,
checkInHeader,
checkInPayload,
checkOutHeader,
checkOutTrailer,
checkErrorStats,
}
@ -615,10 +615,10 @@ func TestServerStatsStreamingRPC(t *testing.T) {
mu sync.Mutex
got []*gotData
)
stats.RegisterHandler(func(ctx context.Context, s stats.Stats) {
stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if !s.ClientStats() {
if !s.IsClient() {
got = append(got, &gotData{ctx, false, s})
}
})
@ -643,17 +643,17 @@ func TestServerStatsStreamingRPC(t *testing.T) {
}
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkIncomingHeaderStats,
checkOutgoingHeaderStats,
checkInHeader,
checkOutHeader,
}
ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkIncomingPayloadStats,
checkOutgoingPayloadStats,
checkInPayload,
checkOutPayload,
}
for i := 0; i < count; i++ {
checkFuncs = append(checkFuncs, ioPayFuncs...)
}
checkFuncs = append(checkFuncs, checkOutgoingTrailerStats)
checkFuncs = append(checkFuncs, checkOutTrailer)
if len(got) != len(checkFuncs) {
t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs))
@ -673,10 +673,10 @@ func TestServerStatsStreamingRPCError(t *testing.T) {
mu sync.Mutex
got []*gotData
)
stats.RegisterHandler(func(ctx context.Context, s stats.Stats) {
stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if !s.ClientStats() {
if !s.IsClient() {
got = append(got, &gotData{ctx, false, s})
}
})
@ -702,10 +702,10 @@ func TestServerStatsStreamingRPCError(t *testing.T) {
}
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkIncomingHeaderStats,
checkOutgoingHeaderStats,
checkIncomingPayloadStats,
checkOutgoingTrailerStats,
checkInHeader,
checkOutHeader,
checkInPayload,
checkOutTrailer,
checkErrorStats,
}
@ -732,10 +732,10 @@ func TestClientStatsUnaryRPC(t *testing.T) {
mu sync.Mutex
got []*gotData
)
stats.RegisterHandler(func(ctx context.Context, s stats.Stats) {
stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if s.ClientStats() {
if s.IsClient() {
got = append(got, &gotData{ctx, true, s})
}
})
@ -758,11 +758,11 @@ func TestClientStatsUnaryRPC(t *testing.T) {
}
checkFuncs := map[int]*checkFuncWithCount{
outheader: &checkFuncWithCount{checkOutgoingHeaderStats, 1},
outpay: &checkFuncWithCount{checkOutgoingPayloadStats, 1},
inheader: &checkFuncWithCount{checkIncomingHeaderStats, 1},
inpay: &checkFuncWithCount{checkIncomingPayloadStats, 1},
intrailer: &checkFuncWithCount{checkIncomingTrailerStats, 1},
outheader: &checkFuncWithCount{checkOutHeader, 1},
outpay: &checkFuncWithCount{checkOutPayload, 1},
inheader: &checkFuncWithCount{checkInHeader, 1},
inpay: &checkFuncWithCount{checkInPayload, 1},
intrailer: &checkFuncWithCount{checkInTrailer, 1},
}
var expectLen int
@ -776,31 +776,31 @@ func TestClientStatsUnaryRPC(t *testing.T) {
for _, s := range got {
mu.Lock()
switch s.s.(type) {
case *stats.OutgoingHeaderStats:
case *stats.OutHeader:
if checkFuncs[outheader].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[outheader].f(t, s, expect)
checkFuncs[outheader].c--
case *stats.OutgoingPayloadStats:
case *stats.OutPayload:
if checkFuncs[outpay].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[outpay].f(t, s, expect)
checkFuncs[outpay].c--
case *stats.IncomingHeaderStats:
case *stats.InHeader:
if checkFuncs[inheader].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[inheader].f(t, s, expect)
checkFuncs[inheader].c--
case *stats.IncomingPayloadStats:
case *stats.InPayload:
if checkFuncs[inpay].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[inpay].f(t, s, expect)
checkFuncs[inpay].c--
case *stats.IncomingTrailerStats:
case *stats.InTrailer:
if checkFuncs[intrailer].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
@ -820,10 +820,10 @@ func TestClientStatsUnaryRPCError(t *testing.T) {
mu sync.Mutex
got []*gotData
)
stats.RegisterHandler(func(ctx context.Context, s stats.Stats) {
stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if s.ClientStats() {
if s.IsClient() {
got = append(got, &gotData{ctx, true, s})
}
})
@ -847,10 +847,10 @@ func TestClientStatsUnaryRPCError(t *testing.T) {
}
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkOutgoingHeaderStats,
checkOutgoingPayloadStats,
checkIncomingHeaderStats,
checkIncomingTrailerStats,
checkOutHeader,
checkOutPayload,
checkInHeader,
checkInTrailer,
checkErrorStats,
}
@ -872,10 +872,10 @@ func TestClientStatsStreamingRPC(t *testing.T) {
mu sync.Mutex
got []*gotData
)
stats.RegisterHandler(func(ctx context.Context, s stats.Stats) {
stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if s.ClientStats() {
if s.IsClient() {
got = append(got, &gotData{ctx, true, s})
}
})
@ -900,11 +900,11 @@ func TestClientStatsStreamingRPC(t *testing.T) {
}
checkFuncs := map[int]*checkFuncWithCount{
outheader: &checkFuncWithCount{checkOutgoingHeaderStats, 1},
outpay: &checkFuncWithCount{checkOutgoingPayloadStats, count},
inheader: &checkFuncWithCount{checkIncomingHeaderStats, 1},
inpay: &checkFuncWithCount{checkIncomingPayloadStats, count},
intrailer: &checkFuncWithCount{checkIncomingTrailerStats, 1},
outheader: &checkFuncWithCount{checkOutHeader, 1},
outpay: &checkFuncWithCount{checkOutPayload, count},
inheader: &checkFuncWithCount{checkInHeader, 1},
inpay: &checkFuncWithCount{checkInPayload, count},
intrailer: &checkFuncWithCount{checkInTrailer, 1},
}
var expectLen int
@ -918,31 +918,31 @@ func TestClientStatsStreamingRPC(t *testing.T) {
for _, s := range got {
mu.Lock()
switch s.s.(type) {
case *stats.OutgoingHeaderStats:
case *stats.OutHeader:
if checkFuncs[outheader].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[outheader].f(t, s, expect)
checkFuncs[outheader].c--
case *stats.OutgoingPayloadStats:
case *stats.OutPayload:
if checkFuncs[outpay].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[outpay].f(t, s, expect)
checkFuncs[outpay].c--
case *stats.IncomingHeaderStats:
case *stats.InHeader:
if checkFuncs[inheader].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[inheader].f(t, s, expect)
checkFuncs[inheader].c--
case *stats.IncomingPayloadStats:
case *stats.InPayload:
if checkFuncs[inpay].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[inpay].f(t, s, expect)
checkFuncs[inpay].c--
case *stats.IncomingTrailerStats:
case *stats.InTrailer:
if checkFuncs[intrailer].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
@ -962,10 +962,10 @@ func TestClientStatsStreamingRPCError(t *testing.T) {
mu sync.Mutex
got []*gotData
)
stats.RegisterHandler(func(ctx context.Context, s stats.Stats) {
stats.RegisterHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if s.ClientStats() {
if s.IsClient() {
got = append(got, &gotData{ctx, true, s})
}
})
@ -991,10 +991,10 @@ func TestClientStatsStreamingRPCError(t *testing.T) {
}
checkFuncs := map[int]*checkFuncWithCount{
outheader: &checkFuncWithCount{checkOutgoingHeaderStats, 1},
outpay: &checkFuncWithCount{checkOutgoingPayloadStats, 1},
inheader: &checkFuncWithCount{checkIncomingHeaderStats, 1},
intrailer: &checkFuncWithCount{checkIncomingTrailerStats, 1},
outheader: &checkFuncWithCount{checkOutHeader, 1},
outpay: &checkFuncWithCount{checkOutPayload, 1},
inheader: &checkFuncWithCount{checkInHeader, 1},
intrailer: &checkFuncWithCount{checkInTrailer, 1},
errors: &checkFuncWithCount{checkErrorStats, 1},
}
@ -1009,37 +1009,37 @@ func TestClientStatsStreamingRPCError(t *testing.T) {
for _, s := range got {
mu.Lock()
switch s.s.(type) {
case *stats.OutgoingHeaderStats:
case *stats.OutHeader:
if checkFuncs[outheader].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[outheader].f(t, s, expect)
checkFuncs[outheader].c--
case *stats.OutgoingPayloadStats:
case *stats.OutPayload:
if checkFuncs[outpay].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[outpay].f(t, s, expect)
checkFuncs[outpay].c--
case *stats.IncomingHeaderStats:
case *stats.InHeader:
if checkFuncs[inheader].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[inheader].f(t, s, expect)
checkFuncs[inheader].c--
case *stats.IncomingPayloadStats:
case *stats.InPayload:
if checkFuncs[inpay].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[inpay].f(t, s, expect)
checkFuncs[inpay].c--
case *stats.IncomingTrailerStats:
case *stats.InTrailer:
if checkFuncs[intrailer].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[intrailer].f(t, s, expect)
checkFuncs[intrailer].c--
case *stats.ErrorStats:
case *stats.RPCErr:
if checkFuncs[errors].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}

View File

@ -101,9 +101,9 @@ type ClientStream interface {
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
defer func() {
if err != nil && stats.On() {
errorStats := &stats.ErrorStats{
IsClient: true,
Error: err,
errorStats := &stats.RPCErr{
Client: true,
Error: err,
}
stats.Handle(ctx, errorStats)
}
@ -265,9 +265,9 @@ func (cs *clientStream) Context() context.Context {
func (cs *clientStream) Header() (_ metadata.MD, err error) {
defer func() {
if err != nil && stats.On() {
errorStats := &stats.ErrorStats{
IsClient: true,
Error: err,
errorStats := &stats.RPCErr{
Client: true,
Error: err,
}
stats.Handle(cs.s.Context(), errorStats)
}
@ -295,9 +295,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
defer func() {
if err != nil && stats.On() {
errorStats := &stats.ErrorStats{
IsClient: true,
Error: err,
errorStats := &stats.RPCErr{
Client: true,
Error: err,
}
stats.Handle(cs.s.Context(), errorStats)
}
@ -324,13 +324,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
err = toRPCErr(err)
}()
var outgoingPayloadStats *stats.OutgoingPayloadStats
var outStats *stats.OutPayload
if stats.On() {
outgoingPayloadStats = &stats.OutgoingPayloadStats{
IsClient: true,
outStats = &stats.OutPayload{
Client: true,
}
}
out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outgoingPayloadStats)
out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outStats)
defer func() {
if cs.cbuf != nil {
cs.cbuf.Reset()
@ -340,9 +340,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
return Errorf(codes.Internal, "grpc: %v", err)
}
err = cs.t.Write(cs.s, out, &transport.Options{Last: false})
if outgoingPayloadStats != nil {
outgoingPayloadStats.SentTime = time.Now()
stats.Handle(cs.s.Context(), outgoingPayloadStats)
if outStats != nil {
outStats.SentTime = time.Now()
stats.Handle(cs.s.Context(), outStats)
}
return err
}
@ -350,20 +350,20 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
defer func() {
if err != nil && err != io.EOF && stats.On() {
errorStats := &stats.ErrorStats{
IsClient: true,
Error: err,
errorStats := &stats.RPCErr{
Client: true,
Error: err,
}
stats.Handle(cs.s.Context(), errorStats)
}
}()
var incomingPayloadStats *stats.IncomingPayloadStats
var inStats *stats.InPayload
if stats.On() {
incomingPayloadStats = &stats.IncomingPayloadStats{
IsClient: true,
inStats = &stats.InPayload{
Client: true,
}
}
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, incomingPayloadStats)
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inStats)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
@ -378,14 +378,14 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
}
cs.mu.Unlock()
}
if incomingPayloadStats != nil {
stats.Handle(cs.s.Context(), incomingPayloadStats)
if inStats != nil {
stats.Handle(cs.s.Context(), inStats)
}
if !cs.desc.ClientStreams || cs.desc.ServerStreams {
return
}
// Special handling for client streaming rpc.
// This recv expects EOF or errors, so we don't collect incomingPayloadStats.
// This recv expects EOF or errors, so we don't collect inStats.
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil)
cs.closeTransportStream(err)
if err == nil {
@ -540,11 +540,11 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
var outgoingPayloadStats *stats.OutgoingPayloadStats
var outStats *stats.OutPayload
if stats.On() {
outgoingPayloadStats = &stats.OutgoingPayloadStats{}
outStats = &stats.OutPayload{}
}
out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outgoingPayloadStats)
out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outStats)
defer func() {
if ss.cbuf != nil {
ss.cbuf.Reset()
@ -557,9 +557,9 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
if outgoingPayloadStats != nil {
outgoingPayloadStats.SentTime = time.Now()
stats.Handle(ss.s.Context(), outgoingPayloadStats)
if outStats != nil {
outStats.SentTime = time.Now()
stats.Handle(ss.s.Context(), outStats)
}
return nil
}
@ -579,11 +579,11 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
var incomingPayloadStats *stats.IncomingPayloadStats
var inStats *stats.InPayload
if stats.On() {
incomingPayloadStats = &stats.IncomingPayloadStats{}
inStats = &stats.InPayload{}
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, incomingPayloadStats); err != nil {
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inStats); err != nil {
if err == io.EOF {
return err
}
@ -592,8 +592,8 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
return toRPCErr(err)
}
if incomingPayloadStats != nil {
stats.Handle(ss.s.Context(), incomingPayloadStats)
if inStats != nil {
stats.Handle(ss.s.Context(), inStats)
}
return nil
}

View File

@ -450,15 +450,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
}
if stats.On() {
outgoingHeaderStats := &stats.OutgoingHeaderStats{
IsClient: true,
outHeader := &stats.OutHeader{
Client: true,
WireLength: bufLen,
Method: callHdr.Method,
RemoteAddr: t.RemoteAddr(),
LocalAddr: t.LocalAddr(),
Encryption: callHdr.SendCompress,
}
stats.Handle(s.Context(), outgoingHeaderStats)
stats.Handle(s.Context(), outHeader)
}
t.writableChan <- 0
return s, nil
@ -891,17 +891,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
defer func() {
if stats.On() {
if isHeader {
incomingHeaderStats := &stats.IncomingHeaderStats{
IsClient: true,
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
}
stats.Handle(s.ctx, incomingHeaderStats)
stats.Handle(s.ctx, inHeader)
} else {
incomingTrailerStats := &stats.IncomingTrailerStats{
IsClient: true,
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
}
stats.Handle(s.ctx, incomingTrailerStats)
stats.Handle(s.ctx, inTrailer)
}
}
}()

View File

@ -236,14 +236,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.updateWindow(s, uint32(n))
}
if stats.On() {
incomingHeaderStats := &stats.IncomingHeaderStats{
inHeader := &stats.InHeader{
Method: s.method,
RemoteAddr: t.conn.RemoteAddr(),
LocalAddr: t.conn.LocalAddr(),
Encryption: s.recvCompress,
WireLength: int(frame.Header().Length),
}
stats.Handle(s.ctx, incomingHeaderStats)
stats.Handle(s.ctx, inHeader)
}
handle(s)
return
@ -524,10 +524,10 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
return err
}
if stats.On() {
outgoingHeaderStats := &stats.OutgoingHeaderStats{
outHeader := &stats.OutHeader{
WireLength: bufLen,
}
stats.Handle(s.Context(), outgoingHeaderStats)
stats.Handle(s.Context(), outHeader)
}
t.writableChan <- 0
return nil
@ -587,10 +587,10 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
return err
}
if stats.On() {
outgoingTrailerStats := &stats.OutgoingTrailerStats{
outTrailer := &stats.OutTrailer{
WireLength: bufLen,
}
stats.Handle(s.Context(), outgoingTrailerStats)
stats.Handle(s.Context(), outTrailer)
}
t.closeStream(s)
t.writableChan <- 0