linkerd2/controller/webhook/server.go

149 lines
4.0 KiB
Go

package webhook
import (
"context"
"crypto/tls"
"encoding/json"
"io/ioutil"
"net/http"
"github.com/linkerd/linkerd2/controller/k8s"
pkgTls "github.com/linkerd/linkerd2/pkg/tls"
log "github.com/sirupsen/logrus"
admissionv1beta1 "k8s.io/api/admission/v1beta1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/yaml"
)
type handlerFunc func(context.Context, *k8s.API, *admissionv1beta1.AdmissionRequest, record.EventRecorder) (*admissionv1beta1.AdmissionResponse, error)
// Server describes the https server implementing the webhook
type Server struct {
*http.Server
api *k8s.API
handler handlerFunc
recorder record.EventRecorder
}
// NewServer returns a new instance of Server
func NewServer(api *k8s.API, addr string, cred *pkgTls.Cred, handler handlerFunc, component string) (*Server, error) {
var (
certPEM = cred.EncodePEM()
keyPEM = cred.EncodePrivateKeyPEM()
)
cert, err := tls.X509KeyPair([]byte(certPEM), []byte(keyPEM))
if err != nil {
return nil, err
}
server := &http.Server{
Addr: addr,
TLSConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
},
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
// In order to send events to all namespaces, we need to use an empty string here
// re: client-go's event_expansion.go CreateWithEventNamespace()
Interface: api.Client.CoreV1().Events(""),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component})
s := &Server{server, api, handler, recorder}
s.Handler = http.HandlerFunc(s.serve)
return s, nil
}
// Start starts the https server
func (s *Server) Start() {
log.Infof("listening at %s", s.Server.Addr)
if err := s.ListenAndServeTLS("", ""); err != nil {
if err == http.ErrServerClosed {
return
}
log.Fatal(err)
}
}
func (s *Server) serve(res http.ResponseWriter, req *http.Request) {
var (
data []byte
err error
)
if req.Body != nil {
data, err = ioutil.ReadAll(req.Body)
if err != nil {
http.Error(res, err.Error(), http.StatusInternalServerError)
return
}
}
if len(data) == 0 {
log.Warn("received empty payload")
return
}
response := s.processReq(req.Context(), data)
responseJSON, err := json.Marshal(response)
if err != nil {
http.Error(res, err.Error(), http.StatusInternalServerError)
return
}
if _, err := res.Write(responseJSON); err != nil {
http.Error(res, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Server) processReq(ctx context.Context, data []byte) *admissionv1beta1.AdmissionReview {
admissionReview, err := decode(data)
if err != nil {
log.Errorf("failed to decode data. Reason: %s", err)
admissionReview.Response = &admissionv1beta1.AdmissionResponse{
UID: admissionReview.Request.UID,
Allowed: false,
Result: &metav1.Status{
Message: err.Error(),
},
}
return admissionReview
}
log.Infof("received admission review request %s", admissionReview.Request.UID)
log.Debugf("admission request: %+v", admissionReview.Request)
admissionResponse, err := s.handler(ctx, s.api, admissionReview.Request, s.recorder)
if err != nil {
log.Error("failed to inject sidecar. Reason: ", err)
admissionReview.Response = &admissionv1beta1.AdmissionResponse{
UID: admissionReview.Request.UID,
Allowed: false,
Result: &metav1.Status{
Message: err.Error(),
},
}
return admissionReview
}
admissionReview.Response = admissionResponse
return admissionReview
}
// Shutdown initiates a graceful shutdown of the underlying HTTP server.
func (s *Server) Shutdown(ctx context.Context) error {
return s.Server.Shutdown(ctx)
}
func decode(data []byte) (*admissionv1beta1.AdmissionReview, error) {
var admissionReview admissionv1beta1.AdmissionReview
err := yaml.Unmarshal(data, &admissionReview)
return &admissionReview, err
}