mirror of https://github.com/grpc/grpc-go.git
transport: use prefix logging (#6135)
This commit is contained in:
parent
9c25653be0
commit
a8a25ce994
|
|
@ -30,6 +30,7 @@ import (
|
||||||
|
|
||||||
"golang.org/x/net/http2"
|
"golang.org/x/net/http2"
|
||||||
"golang.org/x/net/http2/hpack"
|
"golang.org/x/net/http2/hpack"
|
||||||
|
"google.golang.org/grpc/internal/grpclog"
|
||||||
"google.golang.org/grpc/internal/grpcutil"
|
"google.golang.org/grpc/internal/grpcutil"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
@ -488,12 +489,13 @@ type loopyWriter struct {
|
||||||
bdpEst *bdpEstimator
|
bdpEst *bdpEstimator
|
||||||
draining bool
|
draining bool
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
|
logger *grpclog.PrefixLogger
|
||||||
|
|
||||||
// Side-specific handlers
|
// Side-specific handlers
|
||||||
ssGoAwayHandler func(*goAway) (bool, error)
|
ssGoAwayHandler func(*goAway) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn) *loopyWriter {
|
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
l := &loopyWriter{
|
l := &loopyWriter{
|
||||||
side: s,
|
side: s,
|
||||||
|
|
@ -507,6 +509,7 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
|
||||||
hEnc: hpack.NewEncoder(&buf),
|
hEnc: hpack.NewEncoder(&buf),
|
||||||
bdpEst: bdpEst,
|
bdpEst: bdpEst,
|
||||||
conn: conn,
|
conn: conn,
|
||||||
|
logger: logger,
|
||||||
}
|
}
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
|
|
@ -536,8 +539,8 @@ const minBatchSize = 1000
|
||||||
// left open to allow the I/O error to be encountered by the reader instead.
|
// left open to allow the I/O error to be encountered by the reader instead.
|
||||||
func (l *loopyWriter) run() (err error) {
|
func (l *loopyWriter) run() (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if logger.V(logLevel) {
|
if l.logger.V(logLevel) {
|
||||||
logger.Infof("transport: loopyWriter exiting with error: %v", err)
|
l.logger.Infof("loopyWriter exiting with error: %v", err)
|
||||||
}
|
}
|
||||||
if !isIOError(err) {
|
if !isIOError(err) {
|
||||||
l.framer.writer.Flush()
|
l.framer.writer.Flush()
|
||||||
|
|
@ -636,8 +639,8 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
|
||||||
if l.side == serverSide {
|
if l.side == serverSide {
|
||||||
str, ok := l.estdStreams[h.streamID]
|
str, ok := l.estdStreams[h.streamID]
|
||||||
if !ok {
|
if !ok {
|
||||||
if logger.V(logLevel) {
|
if l.logger.V(logLevel) {
|
||||||
logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
|
l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -692,8 +695,8 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He
|
||||||
l.hBuf.Reset()
|
l.hBuf.Reset()
|
||||||
for _, f := range hf {
|
for _, f := range hf {
|
||||||
if err := l.hEnc.WriteField(f); err != nil {
|
if err := l.hEnc.WriteField(f); err != nil {
|
||||||
if logger.V(logLevel) {
|
if l.logger.V(logLevel) {
|
||||||
logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err)
|
l.logger.Warningf("Encountered error while encoding headers: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@ import (
|
||||||
"golang.org/x/net/http2"
|
"golang.org/x/net/http2"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
|
"google.golang.org/grpc/internal/grpclog"
|
||||||
"google.golang.org/grpc/internal/grpcutil"
|
"google.golang.org/grpc/internal/grpcutil"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
|
|
@ -83,6 +84,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
|
||||||
contentSubtype: contentSubtype,
|
contentSubtype: contentSubtype,
|
||||||
stats: stats,
|
stats: stats,
|
||||||
}
|
}
|
||||||
|
st.logger = prefixLoggerForServerHandlerTransport(st)
|
||||||
|
|
||||||
if v := r.Header.Get("grpc-timeout"); v != "" {
|
if v := r.Header.Get("grpc-timeout"); v != "" {
|
||||||
to, err := decodeTimeout(v)
|
to, err := decodeTimeout(v)
|
||||||
|
|
@ -151,12 +153,13 @@ type serverHandlerTransport struct {
|
||||||
contentSubtype string
|
contentSubtype string
|
||||||
|
|
||||||
stats []stats.Handler
|
stats []stats.Handler
|
||||||
|
logger *grpclog.PrefixLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ht *serverHandlerTransport) Close(err error) {
|
func (ht *serverHandlerTransport) Close(err error) {
|
||||||
ht.closeOnce.Do(func() {
|
ht.closeOnce.Do(func() {
|
||||||
if logger.V(logLevel) {
|
if ht.logger.V(logLevel) {
|
||||||
logger.Infof("Closing serverHandlerTransport: %v", err)
|
ht.logger.Infof("Closing: %v", err)
|
||||||
}
|
}
|
||||||
close(ht.closedCh)
|
close(ht.closedCh)
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ import (
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/internal/channelz"
|
"google.golang.org/grpc/internal/channelz"
|
||||||
icredentials "google.golang.org/grpc/internal/credentials"
|
icredentials "google.golang.org/grpc/internal/credentials"
|
||||||
|
"google.golang.org/grpc/internal/grpclog"
|
||||||
"google.golang.org/grpc/internal/grpcsync"
|
"google.golang.org/grpc/internal/grpcsync"
|
||||||
"google.golang.org/grpc/internal/grpcutil"
|
"google.golang.org/grpc/internal/grpcutil"
|
||||||
imetadata "google.golang.org/grpc/internal/metadata"
|
imetadata "google.golang.org/grpc/internal/metadata"
|
||||||
|
|
@ -145,6 +146,7 @@ type http2Client struct {
|
||||||
bufferPool *bufferPool
|
bufferPool *bufferPool
|
||||||
|
|
||||||
connectionID uint64
|
connectionID uint64
|
||||||
|
logger *grpclog.PrefixLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
|
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
|
||||||
|
|
@ -244,7 +246,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||||
if err := connectCtx.Err(); err != nil {
|
if err := connectCtx.Err(); err != nil {
|
||||||
// connectCtx expired before exiting the function. Hard close the connection.
|
// connectCtx expired before exiting the function. Hard close the connection.
|
||||||
if logger.V(logLevel) {
|
if logger.V(logLevel) {
|
||||||
logger.Infof("newClientTransport: aborting due to connectCtx: %v", err)
|
logger.Infof("Aborting due to connect deadline expiring: %v", err)
|
||||||
}
|
}
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
|
@ -346,6 +348,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||||
bufferPool: newBufferPool(),
|
bufferPool: newBufferPool(),
|
||||||
onClose: onClose,
|
onClose: onClose,
|
||||||
}
|
}
|
||||||
|
t.logger = prefixLoggerForClientTransport(t)
|
||||||
// Add peer information to the http2client context.
|
// Add peer information to the http2client context.
|
||||||
t.ctx = peer.NewContext(t.ctx, t.getPeer())
|
t.ctx = peer.NewContext(t.ctx, t.getPeer())
|
||||||
|
|
||||||
|
|
@ -444,7 +447,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
|
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
|
||||||
t.loopy.run()
|
t.loopy.run()
|
||||||
close(t.writerDone)
|
close(t.writerDone)
|
||||||
}()
|
}()
|
||||||
|
|
@ -859,8 +862,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if transportDrainRequired {
|
if transportDrainRequired {
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Infof("transport: t.nextID > MaxStreamID. Draining")
|
t.logger.Infof("Draining transport: t.nextID > MaxStreamID")
|
||||||
}
|
}
|
||||||
t.GracefulClose()
|
t.GracefulClose()
|
||||||
}
|
}
|
||||||
|
|
@ -952,8 +955,8 @@ func (t *http2Client) Close(err error) {
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Infof("transport: closing: %v", err)
|
t.logger.Infof("Closing: %v", err)
|
||||||
}
|
}
|
||||||
// Call t.onClose ASAP to prevent the client from attempting to create new
|
// Call t.onClose ASAP to prevent the client from attempting to create new
|
||||||
// streams.
|
// streams.
|
||||||
|
|
@ -1009,8 +1012,8 @@ func (t *http2Client) GracefulClose() {
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Infof("transport: GracefulClose called")
|
t.logger.Infof("GracefulClose called")
|
||||||
}
|
}
|
||||||
t.onClose(GoAwayInvalid)
|
t.onClose(GoAwayInvalid)
|
||||||
t.state = draining
|
t.state = draining
|
||||||
|
|
@ -1174,8 +1177,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
||||||
}
|
}
|
||||||
statusCode, ok := http2ErrConvTab[f.ErrCode]
|
statusCode, ok := http2ErrConvTab[f.ErrCode]
|
||||||
if !ok {
|
if !ok {
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error: %v", f.ErrCode)
|
t.logger.Infof("Received a RST_STREAM frame with code %q, but found no mapped gRPC status", f.ErrCode)
|
||||||
}
|
}
|
||||||
statusCode = codes.Unknown
|
statusCode = codes.Unknown
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,9 @@ import (
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"golang.org/x/net/http2"
|
"golang.org/x/net/http2"
|
||||||
"golang.org/x/net/http2/hpack"
|
"golang.org/x/net/http2/hpack"
|
||||||
|
"google.golang.org/grpc/internal/grpclog"
|
||||||
"google.golang.org/grpc/internal/grpcutil"
|
"google.golang.org/grpc/internal/grpcutil"
|
||||||
|
"google.golang.org/grpc/internal/pretty"
|
||||||
"google.golang.org/grpc/internal/syscall"
|
"google.golang.org/grpc/internal/syscall"
|
||||||
|
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
|
|
@ -129,6 +131,8 @@ type http2Server struct {
|
||||||
// This lock may not be taken if mu is already held.
|
// This lock may not be taken if mu is already held.
|
||||||
maxStreamMu sync.Mutex
|
maxStreamMu sync.Mutex
|
||||||
maxStreamID uint32 // max stream ID ever seen
|
maxStreamID uint32 // max stream ID ever seen
|
||||||
|
|
||||||
|
logger *grpclog.PrefixLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServerTransport creates a http2 transport with conn and configuration
|
// NewServerTransport creates a http2 transport with conn and configuration
|
||||||
|
|
@ -267,6 +271,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||||
czData: new(channelzData),
|
czData: new(channelzData),
|
||||||
bufferPool: newBufferPool(),
|
bufferPool: newBufferPool(),
|
||||||
}
|
}
|
||||||
|
t.logger = prefixLoggerForServerTransport(t)
|
||||||
// Add peer information to the http2server context.
|
// Add peer information to the http2server context.
|
||||||
t.ctx = peer.NewContext(t.ctx, t.getPeer())
|
t.ctx = peer.NewContext(t.ctx, t.getPeer())
|
||||||
|
|
||||||
|
|
@ -331,7 +336,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||||
t.handleSettings(sf)
|
t.handleSettings(sf)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
|
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
|
||||||
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
|
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
|
||||||
t.loopy.run()
|
t.loopy.run()
|
||||||
close(t.writerDone)
|
close(t.writerDone)
|
||||||
|
|
@ -425,8 +430,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||||
// "Transports must consider requests containing the Connection header
|
// "Transports must consider requests containing the Connection header
|
||||||
// as malformed." - A41
|
// as malformed." - A41
|
||||||
case "connection":
|
case "connection":
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
|
t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
|
||||||
}
|
}
|
||||||
protocolError = true
|
protocolError = true
|
||||||
default:
|
default:
|
||||||
|
|
@ -436,7 +441,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||||
v, err := decodeMetadataHeader(hf.Name, hf.Value)
|
v, err := decodeMetadataHeader(hf.Name, hf.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
|
headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
|
||||||
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
|
t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
mdata[hf.Name] = append(mdata[hf.Name], v)
|
mdata[hf.Name] = append(mdata[hf.Name], v)
|
||||||
|
|
@ -450,8 +455,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||||
// error, this takes precedence over a client not speaking gRPC.
|
// error, this takes precedence over a client not speaking gRPC.
|
||||||
if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
|
if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
|
||||||
errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
|
errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Errorf("transport: %v", errMsg)
|
t.logger.Infof("Aborting the stream early: %v", errMsg)
|
||||||
}
|
}
|
||||||
t.controlBuf.put(&earlyAbortStream{
|
t.controlBuf.put(&earlyAbortStream{
|
||||||
httpStatus: http.StatusBadRequest,
|
httpStatus: http.StatusBadRequest,
|
||||||
|
|
@ -545,9 +550,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||||
}
|
}
|
||||||
if httpMethod != http.MethodPost {
|
if httpMethod != http.MethodPost {
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
errMsg := fmt.Sprintf("http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
|
errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Infof("transport: %v", errMsg)
|
t.logger.Infof("Aborting the stream early: %v", errMsg)
|
||||||
}
|
}
|
||||||
t.controlBuf.put(&earlyAbortStream{
|
t.controlBuf.put(&earlyAbortStream{
|
||||||
httpStatus: 405,
|
httpStatus: 405,
|
||||||
|
|
@ -563,8 +568,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||||
var err error
|
var err error
|
||||||
if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
|
if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
|
t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
|
||||||
}
|
}
|
||||||
stat, ok := status.FromError(err)
|
stat, ok := status.FromError(err)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
@ -638,8 +643,8 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
|
||||||
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
|
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if se, ok := err.(http2.StreamError); ok {
|
if se, ok := err.(http2.StreamError); ok {
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
|
t.logger.Warningf("Encountered http2.StreamError: %v", se)
|
||||||
}
|
}
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
s := t.activeStreams[se.StreamID]
|
s := t.activeStreams[se.StreamID]
|
||||||
|
|
@ -682,8 +687,8 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
|
||||||
case *http2.GoAwayFrame:
|
case *http2.GoAwayFrame:
|
||||||
// TODO: Handle GoAway from the client appropriately.
|
// TODO: Handle GoAway from the client appropriately.
|
||||||
default:
|
default:
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
|
t.logger.Infof("Received unsupported frame type %T", frame)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -942,8 +947,8 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
|
||||||
var sz int64
|
var sz int64
|
||||||
for _, f := range hdrFrame.hf {
|
for _, f := range hdrFrame.hf {
|
||||||
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
|
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
|
t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
@ -1056,7 +1061,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||||
stBytes, err := proto.Marshal(p)
|
stBytes, err := proto.Marshal(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: return error instead, when callers are able to handle it.
|
// TODO: return error instead, when callers are able to handle it.
|
||||||
logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
|
t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
|
||||||
} else {
|
} else {
|
||||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
|
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
|
||||||
}
|
}
|
||||||
|
|
@ -1171,8 +1176,8 @@ func (t *http2Server) keepalive() {
|
||||||
select {
|
select {
|
||||||
case <-ageTimer.C:
|
case <-ageTimer.C:
|
||||||
// Close the connection after grace period.
|
// Close the connection after grace period.
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Infof("transport: closing server transport due to maximum connection age.")
|
t.logger.Infof("Closing server transport due to maximum connection age")
|
||||||
}
|
}
|
||||||
t.controlBuf.put(closeConnection{})
|
t.controlBuf.put(closeConnection{})
|
||||||
case <-t.done:
|
case <-t.done:
|
||||||
|
|
@ -1223,8 +1228,8 @@ func (t *http2Server) Close(err error) {
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if logger.V(logLevel) {
|
if t.logger.V(logLevel) {
|
||||||
logger.Infof("transport: closing: %v", err)
|
t.logger.Infof("Closing: %v", err)
|
||||||
}
|
}
|
||||||
t.state = closing
|
t.state = closing
|
||||||
streams := t.activeStreams
|
streams := t.activeStreams
|
||||||
|
|
@ -1232,8 +1237,8 @@ func (t *http2Server) Close(err error) {
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
t.controlBuf.finish()
|
t.controlBuf.finish()
|
||||||
close(t.done)
|
close(t.done)
|
||||||
if err := t.conn.Close(); err != nil && logger.V(logLevel) {
|
if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
|
||||||
logger.Infof("transport: error closing conn during Close: %v", err)
|
t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
|
||||||
}
|
}
|
||||||
channelz.RemoveEntry(t.channelzID)
|
channelz.RemoveEntry(t.channelzID)
|
||||||
// Cancel all active streams.
|
// Cancel all active streams.
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,6 @@ import (
|
||||||
"golang.org/x/net/http2/hpack"
|
"golang.org/x/net/http2/hpack"
|
||||||
spb "google.golang.org/genproto/googleapis/rpc/status"
|
spb "google.golang.org/genproto/googleapis/rpc/status"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/grpclog"
|
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -86,7 +85,6 @@ var (
|
||||||
// 504 Gateway timeout - UNAVAILABLE.
|
// 504 Gateway timeout - UNAVAILABLE.
|
||||||
http.StatusGatewayTimeout: codes.Unavailable,
|
http.StatusGatewayTimeout: codes.Unavailable,
|
||||||
}
|
}
|
||||||
logger = grpclog.Component("transport")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// isReservedHeader checks whether hdr belongs to HTTP2 headers
|
// isReservedHeader checks whether hdr belongs to HTTP2 headers
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2023 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package transport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/grpclog"
|
||||||
|
internalgrpclog "google.golang.org/grpc/internal/grpclog"
|
||||||
|
)
|
||||||
|
|
||||||
|
var logger = grpclog.Component("transport")
|
||||||
|
|
||||||
|
func prefixLoggerForServerTransport(p *http2Server) *internalgrpclog.PrefixLogger {
|
||||||
|
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[server-transport %p] ", p))
|
||||||
|
}
|
||||||
|
|
||||||
|
func prefixLoggerForServerHandlerTransport(p *serverHandlerTransport) *internalgrpclog.PrefixLogger {
|
||||||
|
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[server-handler-transport %p] ", p))
|
||||||
|
}
|
||||||
|
|
||||||
|
func prefixLoggerForClientTransport(p *http2Client) *internalgrpclog.PrefixLogger {
|
||||||
|
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[client-transport %p] ", p))
|
||||||
|
}
|
||||||
|
|
@ -5668,7 +5668,7 @@ func (s) TestStatusInvalidUTF8Message(t *testing.T) {
|
||||||
// will fail to marshal the status because of the invalid utf8 message. Details
|
// will fail to marshal the status because of the invalid utf8 message. Details
|
||||||
// will be dropped when sending.
|
// will be dropped when sending.
|
||||||
func (s) TestStatusInvalidUTF8Details(t *testing.T) {
|
func (s) TestStatusInvalidUTF8Details(t *testing.T) {
|
||||||
grpctest.TLogger.ExpectError("transport: failed to marshal rpc status")
|
grpctest.TLogger.ExpectError("Failed to marshal rpc status")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
origMsg = string([]byte{0xff, 0xfe, 0xfd})
|
origMsg = string([]byte{0xff, 0xfe, 0xfd})
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue