mirror of https://github.com/grpc/grpc-go.git
				
				
				
			Unexport transport.StreamErrorf and transport.ConnectionErrorf
This commit is contained in:
		
							parent
							
								
									5060203263
								
							
						
					
					
						commit
						086edd7dfd
					
				| 
						 | 
				
			
			@ -152,7 +152,7 @@ func TestToRPCErr(t *testing.T) {
 | 
			
		|||
		// outputs
 | 
			
		||||
		errOut *rpcError
 | 
			
		||||
	}{
 | 
			
		||||
		{transport.StreamErrorf(codes.Unknown, ""), Errorf(codes.Unknown, "").(*rpcError)},
 | 
			
		||||
		{transport.StreamError{codes.Unknown, ""}, Errorf(codes.Unknown, "").(*rpcError)},
 | 
			
		||||
		{transport.ErrConnClosing, Errorf(codes.Internal, transport.ErrConnClosing.Desc).(*rpcError)},
 | 
			
		||||
	} {
 | 
			
		||||
		err := toRPCErr(test.errIn)
 | 
			
		||||
| 
						 | 
				
			
			@ -173,8 +173,8 @@ func TestContextErr(t *testing.T) {
 | 
			
		|||
		// outputs
 | 
			
		||||
		errOut transport.StreamError
 | 
			
		||||
	}{
 | 
			
		||||
		{context.DeadlineExceeded, transport.StreamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded)},
 | 
			
		||||
		{context.Canceled, transport.StreamErrorf(codes.Canceled, "%v", context.Canceled)},
 | 
			
		||||
		{context.DeadlineExceeded, transport.StreamError{codes.DeadlineExceeded, context.DeadlineExceeded.Error()}},
 | 
			
		||||
		{context.Canceled, transport.StreamError{codes.Canceled, context.Canceled.Error()}},
 | 
			
		||||
	} {
 | 
			
		||||
		err := transport.ContextErr(test.errIn)
 | 
			
		||||
		if err != test.errOut {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -85,7 +85,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr
 | 
			
		|||
	if v := r.Header.Get("grpc-timeout"); v != "" {
 | 
			
		||||
		to, err := decodeTimeout(v)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, StreamErrorf(codes.Internal, "malformed time-out: %v", err)
 | 
			
		||||
			return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		st.timeoutSet = true
 | 
			
		||||
		st.timeout = to
 | 
			
		||||
| 
						 | 
				
			
			@ -393,5 +393,5 @@ func mapRecvMsgError(err error) error {
 | 
			
		|||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return ConnectionErrorf(true, err, err.Error())
 | 
			
		||||
	return connectionErrorf(true, err, err.Error())
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -149,7 +149,7 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
 | 
			
		|||
	scheme := "http"
 | 
			
		||||
	conn, err := dial(opts.Dialer, ctx, addr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, ConnectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
		return nil, connectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	// Any further errors will close the underlying connection
 | 
			
		||||
	defer func(conn net.Conn) {
 | 
			
		||||
| 
						 | 
				
			
			@ -165,7 +165,7 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
 | 
			
		|||
			// Credentials handshake errors are typically considered permanent
 | 
			
		||||
			// to avoid retrying on e.g. bad certificates.
 | 
			
		||||
			temp := isTemporary(err)
 | 
			
		||||
			return nil, ConnectionErrorf(temp, err, "transport: %v", err)
 | 
			
		||||
			return nil, connectionErrorf(temp, err, "transport: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	ua := primaryUA
 | 
			
		||||
| 
						 | 
				
			
			@ -205,11 +205,11 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
 | 
			
		|||
	n, err := t.conn.Write(clientPreface)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Close()
 | 
			
		||||
		return nil, ConnectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
		return nil, connectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if n != len(clientPreface) {
 | 
			
		||||
		t.Close()
 | 
			
		||||
		return nil, ConnectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
 | 
			
		||||
		return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
 | 
			
		||||
	}
 | 
			
		||||
	if initialWindowSize != defaultWindowSize {
 | 
			
		||||
		err = t.framer.writeSettings(true, http2.Setting{
 | 
			
		||||
| 
						 | 
				
			
			@ -221,13 +221,13 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
 | 
			
		|||
	}
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Close()
 | 
			
		||||
		return nil, ConnectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
		return nil, connectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	// Adjust the connection flow control window if needed.
 | 
			
		||||
	if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
 | 
			
		||||
		if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
 | 
			
		||||
			t.Close()
 | 
			
		||||
			return nil, ConnectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
			return nil, connectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	go t.controller()
 | 
			
		||||
| 
						 | 
				
			
			@ -295,12 +295,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
 | 
			
		|||
		}
 | 
			
		||||
		pos := strings.LastIndex(callHdr.Method, "/")
 | 
			
		||||
		if pos == -1 {
 | 
			
		||||
			return nil, StreamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method)
 | 
			
		||||
			return nil, streamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method)
 | 
			
		||||
		}
 | 
			
		||||
		audience := "https://" + callHdr.Host + port + callHdr.Method[:pos]
 | 
			
		||||
		data, err := c.GetRequestMetadata(ctx, audience)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, StreamErrorf(codes.InvalidArgument, "transport: %v", err)
 | 
			
		||||
			return nil, streamErrorf(codes.InvalidArgument, "transport: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		for k, v := range data {
 | 
			
		||||
			authData[k] = v
 | 
			
		||||
| 
						 | 
				
			
			@ -437,7 +437,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
 | 
			
		|||
		}
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.notifyError(err)
 | 
			
		||||
			return nil, ConnectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
			return nil, connectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	t.writableChan <- 0
 | 
			
		||||
| 
						 | 
				
			
			@ -651,7 +651,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
 | 
			
		|||
		// invoked.
 | 
			
		||||
		if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
 | 
			
		||||
			t.notifyError(err)
 | 
			
		||||
			return ConnectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
			return connectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		if t.framer.adjustNumWriters(-1) == 0 {
 | 
			
		||||
			t.framer.flushWrite()
 | 
			
		||||
| 
						 | 
				
			
			@ -699,7 +699,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
 | 
			
		|||
func (t *http2Client) handleData(f *http2.DataFrame) {
 | 
			
		||||
	size := len(f.Data())
 | 
			
		||||
	if err := t.fc.onData(uint32(size)); err != nil {
 | 
			
		||||
		t.notifyError(ConnectionErrorf(true, err, "%v", err))
 | 
			
		||||
		t.notifyError(connectionErrorf(true, err, "%v", err))
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	// Select the right stream to dispatch.
 | 
			
		||||
| 
						 | 
				
			
			@ -805,7 +805,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
 | 
			
		|||
	if t.state == reachable || t.state == draining {
 | 
			
		||||
		if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
 | 
			
		||||
			t.mu.Unlock()
 | 
			
		||||
			t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
 | 
			
		||||
			t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		select {
 | 
			
		||||
| 
						 | 
				
			
			@ -814,7 +814,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
 | 
			
		|||
			// t.goAway has been closed (i.e.,multiple GoAways).
 | 
			
		||||
			if id < f.LastStreamID {
 | 
			
		||||
				t.mu.Unlock()
 | 
			
		||||
				t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
 | 
			
		||||
				t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			t.prevGoAwayID = id
 | 
			
		||||
| 
						 | 
				
			
			@ -929,7 +929,7 @@ func (t *http2Client) reader() {
 | 
			
		|||
				t.mu.Unlock()
 | 
			
		||||
				if s != nil {
 | 
			
		||||
					// use error detail to provide better err message
 | 
			
		||||
					handleMalformedHTTP2(s, StreamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
 | 
			
		||||
					handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
 | 
			
		||||
				}
 | 
			
		||||
				continue
 | 
			
		||||
			} else {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -111,12 +111,12 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
 | 
			
		|||
			Val: uint32(initialWindowSize)})
 | 
			
		||||
	}
 | 
			
		||||
	if err := framer.writeSettings(true, settings...); err != nil {
 | 
			
		||||
		return nil, ConnectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
		return nil, connectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	// Adjust the connection flow control window if needed.
 | 
			
		||||
	if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
 | 
			
		||||
		if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
 | 
			
		||||
			return nil, ConnectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
			return nil, connectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	var buf bytes.Buffer
 | 
			
		||||
| 
						 | 
				
			
			@ -448,7 +448,7 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
 | 
			
		|||
		}
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Close()
 | 
			
		||||
			return ConnectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
			return connectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
| 
						 | 
				
			
			@ -544,7 +544,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
 | 
			
		|||
	s.mu.Lock()
 | 
			
		||||
	if s.state == streamDone {
 | 
			
		||||
		s.mu.Unlock()
 | 
			
		||||
		return StreamErrorf(codes.Unknown, "the stream has been done")
 | 
			
		||||
		return streamErrorf(codes.Unknown, "the stream has been done")
 | 
			
		||||
	}
 | 
			
		||||
	if !s.headerOk {
 | 
			
		||||
		writeHeaderFrame = true
 | 
			
		||||
| 
						 | 
				
			
			@ -568,7 +568,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
 | 
			
		|||
		}
 | 
			
		||||
		if err := t.framer.writeHeaders(false, p); err != nil {
 | 
			
		||||
			t.Close()
 | 
			
		||||
			return ConnectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
			return connectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		t.writableChan <- 0
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -642,7 +642,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
 | 
			
		|||
		}
 | 
			
		||||
		if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
 | 
			
		||||
			t.Close()
 | 
			
		||||
			return ConnectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
			return connectionErrorf(true, err, "transport: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		if t.framer.adjustNumWriters(-1) == 0 {
 | 
			
		||||
			t.framer.flushWrite()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -162,7 +162,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
 | 
			
		|||
	switch f.Name {
 | 
			
		||||
	case "content-type":
 | 
			
		||||
		if !validContentType(f.Value) {
 | 
			
		||||
			d.setErr(StreamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
 | 
			
		||||
			d.setErr(streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	case "grpc-encoding":
 | 
			
		||||
| 
						 | 
				
			
			@ -170,7 +170,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
 | 
			
		|||
	case "grpc-status":
 | 
			
		||||
		code, err := strconv.Atoi(f.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			d.setErr(StreamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
 | 
			
		||||
			d.setErr(streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		d.statusCode = codes.Code(code)
 | 
			
		||||
| 
						 | 
				
			
			@ -181,7 +181,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
 | 
			
		|||
		var err error
 | 
			
		||||
		d.timeout, err = decodeTimeout(f.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			d.setErr(StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
 | 
			
		||||
			d.setErr(streamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	case ":path":
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -476,16 +476,16 @@ type ServerTransport interface {
 | 
			
		|||
	Drain()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// StreamErrorf creates an StreamError with the specified error code and description.
 | 
			
		||||
func StreamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
 | 
			
		||||
// streamErrorf creates an StreamError with the specified error code and description.
 | 
			
		||||
func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
 | 
			
		||||
	return StreamError{
 | 
			
		||||
		Code: c,
 | 
			
		||||
		Desc: fmt.Sprintf(format, a...),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ConnectionErrorf creates an ConnectionError with the specified error description.
 | 
			
		||||
func ConnectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
 | 
			
		||||
// connectionErrorf creates an ConnectionError with the specified error description.
 | 
			
		||||
func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
 | 
			
		||||
	return ConnectionError{
 | 
			
		||||
		Desc: fmt.Sprintf(format, a...),
 | 
			
		||||
		temp: temp,
 | 
			
		||||
| 
						 | 
				
			
			@ -522,10 +522,10 @@ func (e ConnectionError) Origin() error {
 | 
			
		|||
 | 
			
		||||
var (
 | 
			
		||||
	// ErrConnClosing indicates that the transport is closing.
 | 
			
		||||
	ErrConnClosing = ConnectionErrorf(true, nil, "transport is closing")
 | 
			
		||||
	ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
 | 
			
		||||
	// ErrStreamDrain indicates that the stream is rejected by the server because
 | 
			
		||||
	// the server stops accepting new RPCs.
 | 
			
		||||
	ErrStreamDrain = StreamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
 | 
			
		||||
	ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// StreamError is an error that only affects one stream within a connection.
 | 
			
		||||
| 
						 | 
				
			
			@ -542,9 +542,9 @@ func (e StreamError) Error() string {
 | 
			
		|||
func ContextErr(err error) StreamError {
 | 
			
		||||
	switch err {
 | 
			
		||||
	case context.DeadlineExceeded:
 | 
			
		||||
		return StreamErrorf(codes.DeadlineExceeded, "%v", err)
 | 
			
		||||
		return streamErrorf(codes.DeadlineExceeded, "%v", err)
 | 
			
		||||
	case context.Canceled:
 | 
			
		||||
		return StreamErrorf(codes.Canceled, "%v", err)
 | 
			
		||||
		return streamErrorf(codes.Canceled, "%v", err)
 | 
			
		||||
	}
 | 
			
		||||
	panic(fmt.Sprintf("Unexpected error from context packet: %v", err))
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -414,7 +414,7 @@ func TestLargeMessageSuspension(t *testing.T) {
 | 
			
		|||
	}
 | 
			
		||||
	// Write should not be done successfully due to flow control.
 | 
			
		||||
	err = ct.Write(s, expectedRequestLarge, &Options{Last: true, Delay: false})
 | 
			
		||||
	expectedErr := StreamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded)
 | 
			
		||||
	expectedErr := streamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded)
 | 
			
		||||
	if err != expectedErr {
 | 
			
		||||
		t.Fatalf("Write got %v, want %v", err, expectedErr)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue