http-add-on/interceptor/handler/upstream.go

88 lines
2.2 KiB
Go

package handler
import (
"errors"
"net/http"
"net/http/httputil"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"github.com/kedacore/http-add-on/interceptor/config"
"github.com/kedacore/http-add-on/pkg/util"
)
var (
errNilStream = errors.New("context stream is nil")
)
type Upstream struct {
roundTripper http.RoundTripper
tracingCfg *config.Tracing
shouldFailover bool
}
func NewUpstream(roundTripper http.RoundTripper, tracingCfg *config.Tracing, shouldFailover bool) *Upstream {
return &Upstream{
roundTripper: roundTripper,
tracingCfg: tracingCfg,
shouldFailover: shouldFailover,
}
}
var _ http.Handler = (*Upstream)(nil)
func (uh *Upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = util.RequestWithLoggerWithName(r, "UpstreamHandler")
ctx := r.Context()
if uh.tracingCfg.Enabled {
p := otel.GetTextMapPropagator()
ctx = p.Extract(ctx, propagation.HeaderCarrier(r.Header))
p.Inject(ctx, propagation.HeaderCarrier(w.Header()))
span := trace.SpanFromContext(ctx)
defer span.End()
serviceValAttr := attribute.String("service", "keda-http-interceptor-proxy-upstream")
coldStartValAttr := attribute.String("cold-start", w.Header().Get("X-KEDA-HTTP-Cold-Start"))
span.SetAttributes(serviceValAttr, coldStartValAttr)
}
stream := util.StreamFromContext(ctx)
if uh.shouldFailover {
stream = util.FailoverStreamFromContext(ctx)
}
if stream == nil {
sh := NewStatic(http.StatusInternalServerError, errNilStream)
sh.ServeHTTP(w, r)
return
}
proxy := httputil.NewSingleHostReverseProxy(stream)
superDirector := proxy.Director
proxy.Transport = uh.roundTripper
proxy.Director = func(req *http.Request) {
superDirector(req)
req.URL = stream
req.URL.Path = r.URL.Path
req.URL.RawPath = r.URL.RawPath
req.URL.RawQuery = r.URL.RawQuery
// delete the incoming X-Forwarded-For header so the proxy
// puts its own in. This is also important to prevent IP spoofing
req.Header.Del("X-Forwarded-For ")
}
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
sh := NewStatic(http.StatusBadGateway, err)
sh.ServeHTTP(w, r)
}
proxy.ServeHTTP(w, r)
}