linkerd2/pkg/protohttp/protohttp.go

225 lines
6.6 KiB
Go

package protohttp
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"github.com/golang/protobuf/proto"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/k8s"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/status"
kerrors "k8s.io/apimachinery/pkg/api/errors"
)
const (
errorHeader = "linkerd-error"
defaultHTTPErrorStatusCode = http.StatusInternalServerError
contentTypeHeader = "Content-Type"
protobufContentType = "application/octet-stream"
numBytesForMessageLength = 4
)
// HTTPError is an error which indicates the HTTP response contained an error
type HTTPError struct {
Code int
WrappedError error
}
// FlushableResponseWriter wraps a ResponseWriter for use in streaming
// responses, such as Tap.
type FlushableResponseWriter interface {
http.ResponseWriter
http.Flusher
}
// Error satisfies the error interface for HTTPError.
func (e HTTPError) Error() string {
return fmt.Sprintf("HTTP error, status Code [%d] (%v)", e.Code, e.WrappedError)
}
// HTTPRequestToProto converts an HTTP Request to a protobuf request.
func HTTPRequestToProto(req *http.Request, protoRequestOut proto.Message) error {
bytes, err := ioutil.ReadAll(req.Body)
if err != nil {
return HTTPError{
Code: http.StatusBadRequest,
WrappedError: err,
}
}
err = proto.Unmarshal(bytes, protoRequestOut)
if err != nil {
return HTTPError{
Code: http.StatusBadRequest,
WrappedError: err,
}
}
return nil
}
// WriteErrorToHTTPResponse writes a protobuf-encoded error to an HTTP Response.
func WriteErrorToHTTPResponse(w http.ResponseWriter, errorObtained error) {
statusCode := defaultHTTPErrorStatusCode
errorToReturn := errorObtained
if httpErr, ok := errorObtained.(HTTPError); ok {
statusCode = httpErr.Code
errorToReturn = httpErr.WrappedError
}
w.Header().Set(errorHeader, http.StatusText(statusCode))
errorMessageToReturn := errorToReturn.Error()
if grpcError, ok := status.FromError(errorObtained); ok {
errorMessageToReturn = grpcError.Message()
}
errorAsProto := &pb.ApiError{Error: errorMessageToReturn}
err := WriteProtoToHTTPResponse(w, errorAsProto)
if err != nil {
log.Errorf("Error writing error to http response: %v", err)
w.Header().Set(errorHeader, err.Error())
}
}
// WriteProtoToHTTPResponse writes a protobuf-encoded message to an HTTP
// Response.
func WriteProtoToHTTPResponse(w http.ResponseWriter, msg proto.Message) error {
w.Header().Set(contentTypeHeader, protobufContentType)
marshalledProtobufMessage, err := proto.Marshal(msg)
if err != nil {
return err
}
fullPayload := SerializeAsPayload(marshalledProtobufMessage)
_, err = w.Write(fullPayload)
return err
}
// NewStreamingWriter takes a ResponseWriter and returns it wrapped in a
// FlushableResponseWriter.
func NewStreamingWriter(w http.ResponseWriter) (FlushableResponseWriter, error) {
flushableWriter, ok := w.(FlushableResponseWriter)
if !ok {
return nil, fmt.Errorf("streaming not supported by this writer")
}
flushableWriter.Header().Set("Connection", "keep-alive")
flushableWriter.Header().Set("Transfer-Encoding", "chunked")
return flushableWriter, nil
}
// SerializeAsPayload appends a 4-byte length in front of a byte slice.
func SerializeAsPayload(messageContentsInBytes []byte) []byte {
lengthOfThePayload := uint32(len(messageContentsInBytes))
messageLengthInBytes := make([]byte, numBytesForMessageLength)
binary.LittleEndian.PutUint32(messageLengthInBytes, lengthOfThePayload)
return append(messageLengthInBytes, messageContentsInBytes...)
}
func deserializePayloadFromReader(reader *bufio.Reader) ([]byte, error) {
messageLengthAsBytes := make([]byte, numBytesForMessageLength)
_, err := io.ReadFull(reader, messageLengthAsBytes)
if err != nil {
return nil, fmt.Errorf("error while reading message length: %v", err)
}
messageLength := int(binary.LittleEndian.Uint32(messageLengthAsBytes))
messageContentsAsBytes := make([]byte, messageLength)
_, err = io.ReadFull(reader, messageContentsAsBytes)
if err != nil {
return nil, fmt.Errorf("error while reading bytes from message: %v", err)
}
return messageContentsAsBytes, nil
}
// CheckIfResponseHasError checks an HTTP Response for errors and returns error
// information with the following precedence:
// 1. "linkerd-error" header, with protobuf-encoded apiError
// 2. non-200 Status Code, with Kubernetes StatusError
// 3. non-200 Status Code
func CheckIfResponseHasError(rsp *http.Response) error {
// check for protobuf-encoded error
errorMsg := rsp.Header.Get(errorHeader)
if errorMsg != "" {
reader := bufio.NewReader(rsp.Body)
var apiError pb.ApiError
err := FromByteStreamToProtocolBuffers(reader, &apiError)
if err != nil {
return fmt.Errorf("Response has %s header [%s], but response body didn't contain protobuf error: %v", errorHeader, errorMsg, err)
}
return errors.New(apiError.Error)
}
// check for JSON-encoded error
if rsp.StatusCode != http.StatusOK {
if rsp.Body != nil {
bytes, err := ioutil.ReadAll(rsp.Body)
if err == nil && len(bytes) > 0 {
body := string(bytes)
obj, err := k8s.ToRuntimeObject(body)
if err == nil {
return HTTPError{Code: rsp.StatusCode, WrappedError: kerrors.FromObject(obj)}
}
body = fmt.Sprintf("unexpected API response: %s", body)
return HTTPError{Code: rsp.StatusCode, WrappedError: errors.New(body)}
}
}
return HTTPError{Code: rsp.StatusCode, WrappedError: errors.New("unexpected API response")}
}
return nil
}
// FromByteStreamToProtocolBuffers converts a byte stream to a protobuf message.
func FromByteStreamToProtocolBuffers(byteStreamContainingMessage *bufio.Reader, out proto.Message) error {
messageAsBytes, err := deserializePayloadFromReader(byteStreamContainingMessage)
if err != nil {
return fmt.Errorf("error reading byte stream header: %v", err)
}
err = proto.Unmarshal(messageAsBytes, out)
if err != nil {
return fmt.Errorf("error unmarshalling array of [%d] bytes error: %v", len(messageAsBytes), err)
}
return nil
}
// TapReqToURL converts a TapByResourceRequest protobuf object to a URL for use
// with the Kubernetes tap.linkerd.io APIService.
// TODO: Move this, probably into its own package, when /controller/gen/public
// moves into /pkg.
func TapReqToURL(req *pb.TapByResourceRequest) string {
res := req.GetTarget().GetResource()
// non-namespaced
if res.GetType() == k8s.Namespace {
return fmt.Sprintf(
"/apis/tap.linkerd.io/v1alpha1/watch/namespaces/%s/tap",
res.GetName(),
)
}
// namespaced
return fmt.Sprintf(
"/apis/tap.linkerd.io/v1alpha1/watch/namespaces/%s/%s/%s/tap",
res.GetNamespace(), res.GetType()+"s", res.GetName(),
)
}