diff --git a/go16.go b/go16.go index 73eb59fda..b1db21af6 100644 --- a/go16.go +++ b/go16.go @@ -57,8 +57,6 @@ func toRPCErr(err error) error { return err } switch e := err.(type) { - case transport.StreamError: - return status.Error(e.Code, e.Desc) case transport.ConnectionError: return status.Error(codes.Unavailable, e.Desc) default: diff --git a/go17.go b/go17.go index 5f9ab8c4e..71a72e8fe 100644 --- a/go17.go +++ b/go17.go @@ -58,8 +58,6 @@ func toRPCErr(err error) error { return err } switch e := err.(type) { - case transport.StreamError: - return status.Error(e.Code, e.Desc) case transport.ConnectionError: return status.Error(codes.Unavailable, e.Desc) default: diff --git a/internal/transport/handler_server.go b/internal/transport/handler_server.go index 0d7cad452..bc8564345 100644 --- a/internal/transport/handler_server.go +++ b/internal/transport/handler_server.go @@ -80,7 +80,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats sta if v := r.Header.Get("grpc-timeout"); v != "" { to, err := decodeTimeout(v) if err != nil { - return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err) + return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err) } st.timeoutSet = true st.timeout = to @@ -98,7 +98,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats sta for _, v := range vv { v, err := decodeMetadataHeader(k, v) if err != nil { - return nil, streamErrorf(codes.Internal, "malformed binary metadata: %v", err) + return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", err) } metakv = append(metakv, k, v) } @@ -432,17 +432,14 @@ func (ht *serverHandlerTransport) Drain() { // * io.EOF // * io.ErrUnexpectedEOF // * of type transport.ConnectionError -// * of type transport.StreamError +// * an error from the status package func mapRecvMsgError(err error) error { if err == io.EOF || err == io.ErrUnexpectedEOF { return err } if se, ok := err.(http2.StreamError); ok { if code, ok := http2ErrConvTab[se.Code]; ok { - return StreamError{ - Code: code, - Desc: se.Error(), - } + return status.Error(code, se.Error()) } } return connectionErrorf(true, err, err.Error()) diff --git a/internal/transport/handler_server_test.go b/internal/transport/handler_server_test.go index 3261b8e3d..da04e7cef 100644 --- a/internal/transport/handler_server_test.go +++ b/internal/transport/handler_server_test.go @@ -179,7 +179,7 @@ func TestHandlerTransport_NewServerHandlerTransport(t *testing.T) { }, RequestURI: "/service/foo.bar", }, - wantErr: `stream error: code = Internal desc = "malformed time-out: transport: timeout unit is not recognized: \"tomorrow\""`, + wantErr: `rpc error: code = Internal desc = malformed time-out: transport: timeout unit is not recognized: "tomorrow"`, }, { name: "with metadata", @@ -220,7 +220,7 @@ func TestHandlerTransport_NewServerHandlerTransport(t *testing.T) { } got, gotErr := NewServerHandlerTransport(rw, tt.req, nil) if (gotErr != nil) != (tt.wantErr != "") || (gotErr != nil && gotErr.Error() != tt.wantErr) { - t.Errorf("%s: error = %v; want %q", tt.name, gotErr, tt.wantErr) + t.Errorf("%s: error = %q; want %q", tt.name, gotErr.Error(), tt.wantErr) continue } if gotErr != nil { diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 497c78c61..88d1c1612 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -476,7 +476,7 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s return nil, err } - return nil, streamErrorf(codes.Unauthenticated, "transport: %v", err) + return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err) } for k, v := range data { // Capital header names are illegal in HTTP/2. @@ -494,11 +494,11 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call // options, then both sets of credentials will be applied. if callCreds := callHdr.Creds; callCreds != nil { if !t.isSecure && callCreds.RequireTransportSecurity() { - return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection") + return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection") } data, err := callCreds.GetRequestMetadata(ctx, audience) if err != nil { - return nil, streamErrorf(codes.Internal, "transport: %v", err) + return nil, status.Errorf(codes.Internal, "transport: %v", err) } for k, v := range data { // Capital header names are illegal in HTTP/2 @@ -611,7 +611,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea var sz int64 for _, f := range hdrFrame.hf { if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { - hdrListSizeErr = streamErrorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize) + hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize) return false } } @@ -1206,7 +1206,7 @@ func (t *http2Client) reader() { // use error detail to provide better err message code := http2ErrConvTab[se.Code] msg := t.framer.fr.ErrorDetail().Error() - t.closeStream(s, streamError(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false) + t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false) } continue } else { diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 4a9a6753b..a8a09270b 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -299,11 +299,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( streamID := frame.Header().StreamID state := decodeState{serverSide: true} if err := state.decodeHeader(frame); err != nil { - if se, ok := err.(StreamError); ok { + if se, ok := status.FromError(err); ok { t.controlBuf.put(&cleanupStream{ streamID: streamID, rst: true, - rstCode: statusCodeConvTab[se.Code], + rstCode: statusCodeConvTab[se.Code()], onWrite: func() {}, }) } @@ -863,7 +863,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e if !s.isHeaderSent() { // Headers haven't been written yet. if err := t.WriteHeader(s, nil); err != nil { // TODO(mmukhi, dfawley): Make sure this is the right code to return. - return streamErrorf(codes.Internal, "transport: %v", err) + return status.Errorf(codes.Internal, "transport: %v", err) } } else { // Writing headers checks for this condition. diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index dea43e7ea..21da6e80b 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -241,7 +241,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { // frame.Truncated is set to true when framer detects that the current header // list size hits MaxHeaderListSize limit. if frame.Truncated { - return streamErrorf(codes.Internal, "peer header list size exceeded limit") + return status.Error(codes.Internal, "peer header list size exceeded limit") } for _, hf := range frame.Fields { if err := d.processHeaderField(hf); err != nil { @@ -261,7 +261,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { // If grpc status doesn't exist and http status doesn't exist, // then it's a malformed header. if d.httpStatus == nil { - return streamErrorf(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)") + return status.Error(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)") } if *(d.httpStatus) != http.StatusOK { @@ -269,7 +269,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { if !ok { code = codes.Unknown } - return streamErrorf(code, http.StatusText(*(d.httpStatus))) + return status.Error(code, http.StatusText(*(d.httpStatus))) } // gRPC status doesn't exist and http status is OK. @@ -295,7 +295,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error { case "content-type": contentSubtype, validContentType := contentSubtype(f.Value) if !validContentType { - return streamErrorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value) + return status.Errorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value) } d.contentSubtype = contentSubtype // TODO: do we want to propagate the whole content-type in the metadata, @@ -308,7 +308,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error { case "grpc-status": code, err := strconv.Atoi(f.Value) if err != nil { - return streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err) + return status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) } d.rawStatusCode = &code case "grpc-message": @@ -316,38 +316,38 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error { case "grpc-status-details-bin": v, err := decodeBinHeader(f.Value) if err != nil { - return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) } s := &spb.Status{} if err := proto.Unmarshal(v, s); err != nil { - return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) } d.statusGen = status.FromProto(s) case "grpc-timeout": d.timeoutSet = true var err error if d.timeout, err = decodeTimeout(f.Value); err != nil { - return streamErrorf(codes.Internal, "transport: malformed time-out: %v", err) + return status.Errorf(codes.Internal, "transport: malformed time-out: %v", err) } case ":path": d.method = f.Value case ":status": code, err := strconv.Atoi(f.Value) if err != nil { - return streamErrorf(codes.Internal, "transport: malformed http-status: %v", err) + return status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) } d.httpStatus = &code case "grpc-tags-bin": v, err := decodeBinHeader(f.Value) if err != nil { - return streamErrorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) + return status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) } d.statsTags = v d.addMetadata(f.Name, string(v)) case "grpc-trace-bin": v, err := decodeBinHeader(f.Value) if err != nil { - return streamErrorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) + return status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) } d.statsTrace = v d.addMetadata(f.Name, string(v)) diff --git a/internal/transport/transport.go b/internal/transport/transport.go index f9116cf40..9775eeb81 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -619,19 +619,6 @@ type ServerTransport interface { IncrMsgRecv() } -// streamErrorf creates an StreamError with the specified error code and description. -func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError { - return StreamError{ - Code: c, - Desc: fmt.Sprintf(format, a...), - } -} - -// streamError creates an StreamError with the specified error code and description. -func streamError(c codes.Code, desc string) StreamError { - return StreamError{Code: c, Desc: desc} -} - // connectionErrorf creates an ConnectionError with the specified error description. func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError { return ConnectionError{ @@ -674,7 +661,7 @@ var ( // errStreamDrain indicates that the stream is rejected because the // connection is draining. This could be caused by goaway or balancer // removing the address. - errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining") + errStreamDrain = status.Error(codes.Unavailable, "the connection is draining") // errStreamDone is returned from write at the client side to indiacte application // layer of an error. errStreamDone = errors.New("the stream is done") @@ -683,18 +670,6 @@ var ( statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection") ) -// TODO: See if we can replace StreamError with status package errors. - -// StreamError is an error that only affects one stream within a connection. -type StreamError struct { - Code codes.Code - Desc string -} - -func (e StreamError) Error() string { - return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc) -} - // GoAwayReason contains the reason for the GoAway frame received. type GoAwayReason uint8 diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index d83f8267e..52be96f0f 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -446,7 +446,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con } // TestInflightStreamClosing ensures that closing in-flight stream -// sends StreamError to concurrent stream reader. +// sends status error to concurrent stream reader. func TestInflightStreamClosing(t *testing.T) { serverConfig := &ServerConfig{} server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) @@ -460,7 +460,7 @@ func TestInflightStreamClosing(t *testing.T) { } donec := make(chan struct{}) - serr := StreamError{Desc: "client connection is closing"} + serr := status.Error(codes.Internal, "client connection is closing") go func() { defer close(donec) if _, err := stream.Read(make([]byte, defaultWindowSize)); err != serr { @@ -479,7 +479,7 @@ func TestInflightStreamClosing(t *testing.T) { <-timeout.C } case <-timeout.C: - t.Fatalf("Test timed out, expected a StreamError.") + t.Fatalf("Test timed out, expected a status error.") } } @@ -1698,7 +1698,7 @@ func TestInvalidHeaderField(t *testing.T) { } p := make([]byte, http2MaxFrameLen) _, err = s.trReader.(*transportReader).Read(p) - if se, ok := err.(StreamError); !ok || se.Code != codes.Internal || !strings.Contains(err.Error(), expectedInvalidHeaderField) { + if se, ok := status.FromError(err); !ok || se.Code() != codes.Internal || !strings.Contains(err.Error(), expectedInvalidHeaderField) { t.Fatalf("Read got error %v, want error with code %s and contains %q", err, codes.Internal, expectedInvalidHeaderField) } ct.Close() @@ -2093,12 +2093,12 @@ func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) if err == nil { t.Fatalf("Stream.Read(_) unexpectedly returned no error. Expected stream error with code %v", want) } - serr, ok := err.(StreamError) + serr, ok := status.FromError(err) if !ok { - t.Fatalf("err.(Type) = %T, want StreamError", err) + t.Fatalf("err.(Type) = %T, want status error", err) } - if want != serr.Code { - t.Fatalf("Want error code: %v, got: %v", want, serr.Code) + if want != serr.Code() { + t.Fatalf("Want error code: %v, got: %v", want, serr.Code()) } } diff --git a/rpc_util.go b/rpc_util.go index cfd811591..61342c9cd 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -470,7 +470,7 @@ type parser struct { // * io.EOF, when no messages remain // * io.ErrUnexpectedEOF // * of type transport.ConnectionError -// * of type transport.StreamError +// * an error from the status package // No other error values or types must be returned, which also means // that the underlying io.Reader must not return an incompatible // error. diff --git a/rpc_util_test.go b/rpc_util_test.go index a4ff94583..0335fc7d2 100644 --- a/rpc_util_test.go +++ b/rpc_util_test.go @@ -177,7 +177,6 @@ func TestToRPCErr(t *testing.T) { // outputs errOut error }{ - {transport.StreamError{Code: codes.Unknown, Desc: ""}, status.Error(codes.Unknown, "")}, {transport.ErrConnClosing, status.Error(codes.Unavailable, transport.ErrConnClosing.Desc)}, {io.ErrUnexpectedEOF, status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())}, } { diff --git a/server.go b/server.go index fe83421fa..f5bea7238 100644 --- a/server.go +++ b/server.go @@ -963,10 +963,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. switch st := err.(type) { case transport.ConnectionError: // Nothing to do here. - case transport.StreamError: - if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil { - grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) - } default: panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st)) } @@ -1062,10 +1058,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. switch st := err.(type) { case transport.ConnectionError: // Nothing to do here. - case transport.StreamError: - if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil { - grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) - } default: panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) } @@ -1185,12 +1177,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp if appErr != nil { appStatus, ok := status.FromError(appErr) if !ok { - switch err := appErr.(type) { - case transport.StreamError: - appStatus = status.New(err.Code, err.Desc) - default: - appStatus = status.New(codes.Unknown, appErr.Error()) - } + appStatus = status.New(codes.Unknown, appErr.Error()) appErr = appStatus.Err() } if trInfo != nil {