feat(backend): Create a validating webhook for the PipelineVersion kind (#11774)

* Create a validating webhook for the PipelineVersion kind

Signed-off-by: VaniHaripriya <vmudadla@redhat.com>

* Fix the manifests for the validating webhook

This fixes deployment and local development for the Kubernetes native
API manifests.

This also addresses other feedback.

Signed-off-by: mprahl <mprahl@users.noreply.github.com>

---------

Signed-off-by: VaniHaripriya <vmudadla@redhat.com>
Signed-off-by: mprahl <mprahl@users.noreply.github.com>
Co-authored-by: VaniHaripriya <vmudadla@redhat.com>
This commit is contained in:
Matt Prahl 2025-03-31 07:23:31 -04:00 committed by GitHub
parent e9f5b5aee2
commit 2efcde5efd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 951 additions and 106 deletions

View File

@ -25,6 +25,7 @@ if [[ ! -d "$C_DIR" ]]; then C_DIR="$PWD"; fi
source "${C_DIR}/helper-functions.sh"
kubectl apply -k "manifests/kustomize/cluster-scoped-resources/"
kubectl apply -k "manifests/kustomize/base/crds"
kubectl wait crd/applications.app.k8s.io --for condition=established --timeout=60s || EXIT_CODE=$?
if [[ $EXIT_CODE -ne 0 ]]
then
@ -32,6 +33,14 @@ then
exit $EXIT_CODE
fi
#Install cert-manager
make -C ./backend install-cert-manager || EXIT_CODE=$?
if [[ $EXIT_CODE -ne 0 ]]
then
echo "Failed to deploy cert-manager."
exit $EXIT_CODE
fi
# Deploy manifest
TEST_MANIFESTS=".github/resources/manifests/tekton"
kubectl apply -k "${TEST_MANIFESTS}" || EXIT_CODE=$?

View File

@ -25,6 +25,7 @@ if [[ ! -d "$C_DIR" ]]; then C_DIR="$PWD"; fi
source "${C_DIR}/helper-functions.sh"
kubectl apply -k "manifests/kustomize/cluster-scoped-resources/"
kubectl apply -k "manifests/kustomize/base/crds"
kubectl wait crd/applications.app.k8s.io --for condition=established --timeout=60s || EXIT_CODE=$?
if [[ $EXIT_CODE -ne 0 ]]
then
@ -32,6 +33,14 @@ then
exit $EXIT_CODE
fi
#Install cert-manager
make -C ./backend install-cert-manager || EXIT_CODE=$?
if [[ $EXIT_CODE -ne 0 ]]
then
echo "Failed to deploy cert-manager."
exit $EXIT_CODE
fi
# Deploy manifest
TEST_MANIFESTS=".github/resources/manifests/argo"
kubectl apply -k "${TEST_MANIFESTS}" || EXIT_CODE=$?

View File

@ -3,6 +3,8 @@ MOD_ROOT=..
CSV_PATH=backend/third_party_licenses
KIND_NAME ?= dev-pipelines-api
CERT_MANAGER_VERSION ?= v1.16.2
# Container Build Params
CONTAINER_ENGINE ?= docker
# IMG_REGISTRY can be used to automatically prepend registry details. e.g. "quay.io/kubeflow/"
@ -15,6 +17,7 @@ IMG_TAG_VIEWERCONTROLLER ?= viewercontroller
IMG_TAG_VISUALIZATION ?= visualization
IMG_TAG_DRIVER ?= kfp-driver
IMG_TAG_LAUNCHER ?= kfp-launcher
IMG_TAG_WEBHOOK_PROXY ?= domain.local/kfp/webhook-proxy:latest
# Whenever build command for any of the binaries change, we should update them both here and in backend/Dockerfiles.
@ -63,14 +66,24 @@ image_driver_debug:
image_launcher:
cd $(MOD_ROOT) && ${CONTAINER_ENGINE} build --platform linux/amd64 -t ${IMG_REGISTRY}${IMG_TAG_LAUNCHER} -f backend/Dockerfile.launcher .
.PHONY: install-cert-manager
install-cert-manager:
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/$(CERT_MANAGER_VERSION)/cert-manager.yaml
kubectl wait deployment -n cert-manager cert-manager --for condition=Available=True --timeout=180s
kubectl wait --for=condition=Ready pod -l app.kubernetes.io/instance=cert-manager -n cert-manager --timeout=180s
.PHONY: dev-kind-cluster
dev-kind-cluster:
${CONTAINER_ENGINE} build -t ${IMG_TAG_WEBHOOK_PROXY} -f $(CURDIR)/../tools/kind/Dockerfile.webhook-proxy $(CURDIR)/../tools/kind
-kind create cluster --name $(KIND_NAME) --config $(CURDIR)/../tools/kind/kind-config.yaml
kubectl config use-context kind-$(KIND_NAME)
kind get kubeconfig --name $(KIND_NAME) > $(CURDIR)/../kubeconfig_$(KIND_NAME)
kind --name $(KIND_NAME) load docker-image ${IMG_TAG_WEBHOOK_PROXY}
$(MAKE) install-cert-manager
kubectl apply -k $(CURDIR)/../manifests/kustomize/cluster-scoped-resources
kubectl wait --for condition=established --timeout=1m crd/applications.app.k8s.io
kubectl apply -k $(CURDIR)/../manifests/kustomize/env/dev-kind
kubectl apply -f $(CURDIR)/../tools/kind/webhook-proxy.yaml
kubectl -n kubeflow wait --for condition=Available --timeout=10m deployment/mysql
kubectl -n kubeflow wait --for condition=Available --timeout=3m deployment/metadata-grpc-deployment

View File

@ -288,6 +288,14 @@ without a breakpoint so that Delve will continue execution until the Driver pod
point, you can set a break point, port forward, and connect to the remote debug session to debug that specific Driver
pod.
### Using a Webhook Proxy for Local Development in a Kind Cluster
The Kubeflow Pipelines API server typically runs over HTTPS when deployed in a Kubernetes cluster. However, during local development, it operates over HTTP, which Kubernetes admission webhooks do not support (they require HTTPS). This incompatibility prevents webhooks from functioning correctly in a local Kind cluster.
To resolve this, a webhook proxy acts as a bridge, allowing webhooks to communicate with the API server even when it runs over HTTP.
This is used by default when using the `dev-kind-cluster` Make target.
### Deleting the Kind Cluster
Run the following to delete the cluster (once you are finished):

View File

@ -22,10 +22,10 @@ import (
"github.com/kubeflow/pipelines/backend/src/common/util"
swfclientset "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned"
swfinformers "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/informers/externalversions"
"github.com/kubeflow/pipelines/backend/src/crd/pkg/signals"
log "github.com/sirupsen/logrus"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)
var (
@ -76,7 +76,7 @@ func main() {
flag.Parse()
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
stopCh := signals.SetupSignalHandler().Done()
// Use the util to store the ExecutionType
util.SetExecutionType(util.ExecutionType(executionType))

View File

@ -33,7 +33,10 @@ import (
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
k8sapi "github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1"
"github.com/minio/minio-go/v6"
"k8s.io/apimachinery/pkg/runtime"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)
const (
@ -71,6 +74,18 @@ const (
clientBurst = "ClientBurst"
)
var scheme *runtime.Scheme
func init() {
scheme = runtime.NewScheme()
err := k8sapi.AddToScheme(scheme)
if err != nil {
// Panic is okay here because it means there's a code issue and so the package shouldn't initialize.
panic(fmt.Sprintf("Failed to initialize the Kubernetes API scheme: %v", err))
}
}
// Container for all service clients.
type ClientManager struct {
db *storage.DB
@ -92,6 +107,7 @@ type ClientManager struct {
time util.TimeInterface
uuid util.UUIDGeneratorInterface
authenticators []auth.Authenticator
controllerClient ctrlclient.Client
}
func (c *ClientManager) TaskStore() storage.TaskStoreInterface {
@ -166,7 +182,31 @@ func (c *ClientManager) Authenticators() []auth.Authenticator {
return c.authenticators
}
func (c *ClientManager) init() {
func (c *ClientManager) ControllerClient() ctrlclient.Client {
return c.controllerClient
}
func (c *ClientManager) init() error {
glog.Info("Initializing controller client...")
restConfig, err := util.GetKubernetesConfig()
if err != nil {
return fmt.Errorf("failed to initialize the RestConfig: %w", err)
}
controllerClient, err := ctrlclient.New(
restConfig, ctrlclient.Options{Scheme: scheme},
)
if err != nil {
return fmt.Errorf("failed to initialize the controller client: %w", err)
}
c.controllerClient = controllerClient
glog.Info("Controller client initialized successfully.")
if common.IsOnlyKubernetesWebhookMode() {
return nil
}
glog.Info("Initializing client manager")
glog.Info("Initializing DB client...")
db := InitDBClient(common.GetDurationConfig(initConnectionTimeout))
@ -214,6 +254,8 @@ func (c *ClientManager) init() {
c.authenticators = auth.GetAuthenticators(c.tokenReviewClient)
}
glog.Infof("Client manager initialized successfully")
return nil
}
func (c *ClientManager) Close() {
@ -531,11 +573,14 @@ func initLogArchive() (logArchive archive.LogArchiveInterface) {
}
// NewClientManager creates and Init a new instance of ClientManager.
func NewClientManager() ClientManager {
func NewClientManager() (ClientManager, error) {
clientManager := ClientManager{}
clientManager.init()
err := clientManager.init()
if err != nil {
return ClientManager{}, err
}
return clientManager
return clientManager, nil
}
// Data migration in 2 steps to introduce pipeline_versions table. This

View File

@ -32,6 +32,7 @@ const (
KubeflowUserIDPrefix string = "KUBEFLOW_USERID_PREFIX"
UpdatePipelineVersionByDefault string = "AUTO_UPDATE_PIPELINE_DEFAULT_VERSION"
TokenReviewAudience string = "TOKEN_REVIEW_AUDIENCE"
GlobalKubernetesWebhookMode string = "GLOBAL_KUBERNETES_WEBHOOK_MODE"
)
func IsPipelineVersionUpdatedByDefault() bool {
@ -127,3 +128,7 @@ func GetKubeflowUserIDPrefix() string {
func GetTokenReviewAudience() string {
return GetStringConfigWithDefault(TokenReviewAudience, DefaultTokenReviewAudience)
}
func IsOnlyKubernetesWebhookMode() bool {
return GetBoolConfigWithDefault(GlobalKubernetesWebhookMode, false)
}

View File

@ -16,10 +16,12 @@ package common
import (
"fmt"
"os"
"regexp"
"strings"
"github.com/kubeflow/pipelines/backend/src/common/util"
"go.uber.org/zap/zapcore"
)
const (
@ -117,3 +119,27 @@ func ValidatePipelineName(pipelineName string) error {
}
return nil
}
func ParseLogLevel(logLevel string) (zapcore.Level, error) {
switch logLevel {
case "debug":
return zapcore.DebugLevel, nil
case "info":
return zapcore.InfoLevel, nil
case "warn":
return zapcore.WarnLevel, nil
case "error":
return zapcore.ErrorLevel, nil
case "panic":
return zapcore.PanicLevel, nil
case "fatal":
return zapcore.FatalLevel, nil
default:
return zapcore.Level(0), fmt.Errorf("could not translate log level to ZAP levels: %s", logLevel)
}
}
func FileExists(filePath string) bool {
_, err := os.Stat(filePath)
return !os.IsNotExist(err)
}

View File

@ -16,7 +16,17 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"io"
"math"
"net"
"net/http"
"strconv"
"strings"
"sync"
"github.com/fsnotify/fsnotify"
"github.com/golang/glog"
"github.com/gorilla/mux"
@ -29,19 +39,18 @@ import (
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/apiserver/server"
"github.com/kubeflow/pipelines/backend/src/apiserver/template"
"github.com/kubeflow/pipelines/backend/src/apiserver/webhook"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"io"
"math"
"net"
"net/http"
"strconv"
"strings"
"sync"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)
const (
@ -50,12 +59,16 @@ const (
)
var (
logLevelFlag = flag.String("logLevel", "", "Defines the log level for the application.")
rpcPortFlag = flag.String("rpcPortFlag", ":8887", "RPC Port")
httpPortFlag = flag.String("httpPortFlag", ":8888", "Http Proxy Port")
configPath = flag.String("config", "", "Path to JSON file containing config")
sampleConfigPath = flag.String("sampleconfig", "", "Path to samples")
collectMetricsFlag = flag.Bool("collectMetricsFlag", true, "Whether to collect Prometheus metrics in API server.")
logLevelFlag = flag.String("logLevel", "", "Defines the log level for the application.")
rpcPortFlag = flag.String("rpcPortFlag", ":8887", "RPC Port")
httpPortFlag = flag.String("httpPortFlag", ":8888", "Http Proxy Port")
webhookPortFlag = flag.String("webhookPortFlag", ":8443", "Https Proxy Port")
webhookTLSCertPath = flag.String("webhookTLSCertPath", "", "Path to the webhook TLS certificate.")
webhookTLSKeyPath = flag.String("webhookTLSKeyPath", "", "Path to the webhook TLS private key.")
configPath = flag.String("config", "", "Path to JSON file containing config")
sampleConfigPath = flag.String("sampleconfig", "", "Path to samples")
collectMetricsFlag = flag.Bool("collectMetricsFlag", true, "Whether to collect Prometheus metrics in API server.")
usePipelinesKubernetesStorage = flag.Bool("pipelinesStoreKubernetes", false, "Store and run pipeline versions in Kubernetes")
)
type RegisterHttpHandlerFromEndpoint func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error
@ -72,23 +85,6 @@ func main() {
template.Launcher = common.GetStringConfig(launcherEnv)
}
clientManager := cm.NewClientManager()
resourceManager := resource.NewResourceManager(
&clientManager,
&resource.ResourceManagerOptions{CollectMetrics: *collectMetricsFlag},
)
err := config.LoadSamples(resourceManager, *sampleConfigPath)
if err != nil {
glog.Fatalf("Failed to load samples. Err: %v", err)
}
if !common.IsMultiUserMode() {
_, err = resourceManager.CreateDefaultExperiment("")
if err != nil {
glog.Fatalf("Failed to create default experiment. Err: %v", err)
}
}
logLevel := *logLevelFlag
if logLevel == "" {
logLevel = "info"
@ -100,16 +96,73 @@ func main() {
}
log.SetLevel(level)
backgroundCtx, backgroundCancel := context.WithCancel(context.Background())
zapLevel, err := common.ParseLogLevel(logLevel)
if err != nil {
glog.Infof("%v. Defaulting to info level.", err)
zapLevel = zapcore.InfoLevel
}
ctrllog.SetLogger(zap.New(zap.UseFlagOptions(&zap.Options{Level: zapLevel})))
clientManager, err := cm.NewClientManager()
if err != nil {
glog.Fatalf("Failed to initialize ClientManager: %v", err)
}
defer clientManager.Close()
backgroundCtx, backgroundCancel := context.WithCancel(signals.SetupSignalHandler())
defer backgroundCancel()
wg := sync.WaitGroup{}
webhookOnlyMode := common.IsOnlyKubernetesWebhookMode()
if *usePipelinesKubernetesStorage || webhookOnlyMode {
wg.Add(1)
webhookServer, err := startWebhook(clientManager.ControllerClient(), &wg)
if err != nil {
glog.Fatalf("Failed to start Kubernetes webhook server: %v", err)
}
go func() {
<-backgroundCtx.Done()
glog.Info("Shutting down Kubernetes webhook server...")
if err := webhookServer.Shutdown(context.Background()); err != nil {
glog.Errorf("Error shutting down webhook server: %v", err)
}
}()
// This mode is used when there are multiple Kubeflow Pipelines installations on the same cluster but the
// administrator only wants to have a single webhook endpoint for all of them. This causes only the webhook
// endpoints to be available.
if webhookOnlyMode {
wg.Wait()
return
}
}
resourceManager := resource.NewResourceManager(
&clientManager,
&resource.ResourceManagerOptions{CollectMetrics: *collectMetricsFlag},
)
err = config.LoadSamples(resourceManager, *sampleConfigPath)
if err != nil {
glog.Fatalf("Failed to load samples. Err: %v", err)
}
if !common.IsMultiUserMode() {
_, err = resourceManager.CreateDefaultExperiment("")
if err != nil {
glog.Fatalf("Failed to create default experiment. Err: %v", err)
}
}
wg.Add(1)
go reconcileSwfCrs(resourceManager, backgroundCtx, &wg)
go startRpcServer(resourceManager)
// This is blocking
startHttpProxy(resourceManager)
backgroundCancel()
clientManager.Close()
wg.Wait()
}
@ -239,6 +292,50 @@ func startHttpProxy(resourceManager *resource.ResourceManager) {
glog.Info("Http Proxy started")
}
func startWebhook(controllerClient ctrlclient.Client, wg *sync.WaitGroup) (*http.Server, error) {
glog.Info("Starting the Kubernetes webhooks...")
tlsCertPath := *webhookTLSCertPath
tlsKeyPath := *webhookTLSKeyPath
topMux := mux.NewRouter()
pvValidateWebhook, err := webhook.NewPipelineVersionWebhook(controllerClient)
if err != nil {
return nil, fmt.Errorf("failed to instantiate the Kubernetes webhook: %v", err)
}
topMux.Handle("/webhooks/validate-pipelineversion", pvValidateWebhook)
webhookServer := &http.Server{
Addr: *webhookPortFlag,
Handler: topMux,
}
go func() {
defer wg.Done()
if tlsCertPath != "" && tlsKeyPath != "" {
if !common.FileExists(tlsCertPath) || !common.FileExists(tlsKeyPath) {
glog.Fatalf("TLS certificate/key paths are set but files do not exist")
}
glog.Info("Starting the Kubernetes webhook with TLS")
err := webhookServer.ListenAndServeTLS(tlsCertPath, tlsKeyPath)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
glog.Fatalf("Failed to start the Kubernetes webhook with TLS: %v", err)
}
return
}
glog.Warning("TLS certificate/key paths are not set. Starting webhook server without TLS.")
err := webhookServer.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
glog.Fatalf("Failed to start Kubernetes webhook server: %v", err)
}
}()
return webhookServer, nil
}
func registerHttpHandlerFromEndpoint(handler RegisterHttpHandlerFromEndpoint, serviceName string, ctx context.Context, mux *runtime.ServeMux) {
endpoint := "localhost" + *rpcPortFlag
opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32))}

View File

@ -0,0 +1,132 @@
/*
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webhook
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/template"
k8sapi "github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
ctrladmission "sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
var scheme *runtime.Scheme
func init() {
scheme = runtime.NewScheme()
err := k8sapi.AddToScheme(scheme)
if err != nil {
// Panic is okay here because it means there's a code issue and so the package shouldn't initialize.
panic(fmt.Sprintf("Failed to initialize the Kubernetes API scheme: %v", err))
}
}
type PipelineVersionsWebhook struct {
Client ctrlclient.Client
}
var _ ctrladmission.CustomValidator = &PipelineVersionsWebhook{}
func newBadRequestError(msg string) *apierrors.StatusError {
return &apierrors.StatusError{
ErrStatus: metav1.Status{
Code: http.StatusBadRequest,
Reason: metav1.StatusReasonBadRequest,
Message: msg,
},
}
}
func (p *PipelineVersionsWebhook) ValidateCreate(
ctx context.Context, obj runtime.Object,
) (warnings ctrladmission.Warnings, err error) {
pipelineVersion, ok := obj.(*k8sapi.PipelineVersion)
if !ok {
return nil, newBadRequestError(fmt.Sprintf("Expected a PipelineVersion object but got %T", pipelineVersion))
}
pipeline := &k8sapi.Pipeline{}
err = p.Client.Get(
ctx, types.NamespacedName{Namespace: pipelineVersion.Namespace, Name: pipelineVersion.Spec.PipelineName}, pipeline,
)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, newBadRequestError("The spec.pipelineName doesn't map to an existing Pipeline object")
}
return nil, err
}
pipelineSpec, err := json.Marshal(pipelineVersion.Spec.PipelineSpec.Value)
if err != nil {
return nil, newBadRequestError(fmt.Sprintf("The pipeline spec is invalid JSON: %v", err))
}
tmpl, err := template.NewV2SpecTemplate(pipelineSpec)
if err != nil {
return nil, newBadRequestError(fmt.Sprintf("The pipeline spec is invalid: %v", err))
}
err = common.ValidatePipelineName(tmpl.V2PipelineName())
if err != nil {
return nil, newBadRequestError(err.Error())
}
return nil, nil
}
func (p *PipelineVersionsWebhook) ValidateUpdate(_ context.Context, oldObj, newObj runtime.Object) (ctrladmission.Warnings, error) {
oldPipelineVersion, ok := oldObj.(*k8sapi.PipelineVersion)
if !ok {
return nil, newBadRequestError(fmt.Sprintf("Expected a PipelineVersion but got %T", oldObj))
}
newPipelineVersion, ok := newObj.(*k8sapi.PipelineVersion)
if !ok {
return nil, newBadRequestError(fmt.Sprintf("Expected a PipelineVersion but got %T", newObj))
}
if oldPipelineVersion.Generation != newPipelineVersion.Generation {
return nil, newBadRequestError("Pipeline spec is immutable; only metadata changes (labels/annotations) are allowed")
}
return nil, nil
}
// ValidateDelete is unused but required to implement the ctrladmission.CustomValidator interface.
func (p *PipelineVersionsWebhook) ValidateDelete(_ context.Context, _ runtime.Object) (ctrladmission.Warnings, error) {
return nil, nil
}
func NewPipelineVersionWebhook(client ctrlclient.Client) (http.Handler, error) {
validating, err := ctrladmission.StandaloneWebhook(
ctrladmission.WithCustomValidator(scheme, &k8sapi.PipelineVersion{}, &PipelineVersionsWebhook{Client: client}),
ctrladmission.StandaloneOptions{},
)
if err != nil {
return nil, err
}
return validating, nil
}

View File

@ -0,0 +1,209 @@
/*
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webhook
import (
"context"
"encoding/json"
"testing"
k8sapi "github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8sfake "sigs.k8s.io/controller-runtime/pkg/client/fake"
)
func setupPipelineWebhookTest(t *testing.T) (*PipelineVersionsWebhook, string) {
scheme := runtime.NewScheme()
require.NoError(t, k8sapi.AddToScheme(scheme))
fakeClient := k8sfake.NewClientBuilder().
WithScheme(scheme).
WithObjects(&k8sapi.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pipeline",
Namespace: "default",
},
}).Build()
pipelineWebhook := &PipelineVersionsWebhook{Client: fakeClient}
validPipelineSpec := map[string]interface{}{
"pipelineInfo": map[string]interface{}{
"name": "test-pipeline-v1",
"description": "A simple test pipeline",
},
"root": map[string]interface{}{
"dag": map[string]interface{}{
"tasks": map[string]interface{}{},
},
},
"schemaVersion": "2.1.0",
"sdkVersion": "kfp-2.11.0",
}
validPipelineSpecJSON, err := json.Marshal(validPipelineSpec)
require.NoError(t, err, "Failed to marshal pipeline spec")
return pipelineWebhook, string(validPipelineSpecJSON)
}
func TestPipelineVersionWebhook_ValidateCreate(t *testing.T) {
pipelineWebhook, validPipelineSpecJSON := setupPipelineWebhookTest(t)
pipelineVersion := &k8sapi.PipelineVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pipeline-v1",
Namespace: "default",
},
Spec: k8sapi.PipelineVersionSpec{
PipelineName: "test-pipeline",
PipelineSpec: k8sapi.PipelineIRSpec{
Value: json.RawMessage(validPipelineSpecJSON),
},
},
}
_, err := pipelineWebhook.ValidateCreate(context.TODO(), pipelineVersion)
assert.NoError(t, err, "Expected no error for a valid PipelineVersion")
}
func TestPipelineVersionWebhook_ValidateCreate_InvalidObjectType(t *testing.T) {
pipelineWebhook, _ := setupPipelineWebhookTest(t)
_, err := pipelineWebhook.ValidateCreate(context.TODO(), &k8sapi.Pipeline{})
assert.Error(t, err, "Expected error when passing an object that is not a PipelineVersion")
assert.Contains(t, err.Error(), "Expected a PipelineVersion object")
}
func TestPipelineVersionWebhook_ValidateCreate_InvalidPipelineSpec(t *testing.T) {
pipelineWebhook, _ := setupPipelineWebhookTest(t)
invalidPipelineSpec := map[string]interface{}{
"pipelineInfo": map[string]interface{}{
"name": "test-pipeline-v1",
"description": "A simple test pipeline",
},
}
invalidPipelineSpecJSON, err := json.Marshal(invalidPipelineSpec)
require.NoError(t, err, "Failed to marshal pipeline spec")
invalidPipelineVersion := &k8sapi.PipelineVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pipeline-v1",
Namespace: "default",
},
Spec: k8sapi.PipelineVersionSpec{
PipelineName: "test-pipeline",
PipelineSpec: k8sapi.PipelineIRSpec{
Value: json.RawMessage(invalidPipelineSpecJSON),
},
},
}
_, err = pipelineWebhook.ValidateCreate(context.TODO(), invalidPipelineVersion)
assert.Error(t, err, "Expected error for invalid PipelineSpec")
assert.Contains(t, err.Error(), "The pipeline spec is invalid")
}
func TestPipelineVersionWebhook_ValidateUpdate(t *testing.T) {
pipelineWebhook, validPipelineSpecJSON := setupPipelineWebhookTest(t)
oldPipelineVersion := &k8sapi.PipelineVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pipeline-v1",
Namespace: "default",
Generation: 1,
},
Spec: k8sapi.PipelineVersionSpec{
PipelineName: "test-pipeline",
PipelineSpec: k8sapi.PipelineIRSpec{
Value: json.RawMessage(validPipelineSpecJSON),
},
},
}
updatedPipelineSpec := map[string]interface{}{
"pipelineInfo": map[string]interface{}{
"name": "test-pipeline-v2",
"description": "Updated pipeline version",
},
"root": map[string]interface{}{
"dag": map[string]interface{}{
"tasks": map[string]interface{}{},
},
},
"schemaVersion": "2.1.0",
"sdkVersion": "kfp-2.11.0",
}
updatedPipelineSpecJSON, err := json.Marshal(updatedPipelineSpec)
require.NoError(t, err, "Failed to marshal updated pipeline spec")
newPipelineVersion := &k8sapi.PipelineVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pipeline-v1",
Namespace: "default",
Generation: 2,
},
Spec: k8sapi.PipelineVersionSpec{
PipelineName: "test-pipeline",
PipelineSpec: k8sapi.PipelineIRSpec{
Value: json.RawMessage(updatedPipelineSpecJSON),
},
},
}
_, err = pipelineWebhook.ValidateUpdate(context.TODO(), oldPipelineVersion, newPipelineVersion)
assert.Error(t, err, "Expected error for modifying pipeline spec")
assert.Contains(t, err.Error(), "Pipeline spec is immutable")
}
func TestPipelineVersionWebhook_ValidateUpdate_MetadataChangeAllowed(t *testing.T) {
pipelineWebhook, validPipelineSpecJSON := setupPipelineWebhookTest(t)
oldPipelineVersion := &k8sapi.PipelineVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pipeline-v1",
Namespace: "default",
Generation: 1,
Labels: map[string]string{"version": "v1"},
},
Spec: k8sapi.PipelineVersionSpec{
PipelineName: "test-pipeline",
PipelineSpec: k8sapi.PipelineIRSpec{
Value: json.RawMessage(validPipelineSpecJSON),
},
},
}
newPipelineVersion := &k8sapi.PipelineVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pipeline-v1",
Namespace: "default",
Generation: 1,
Labels: map[string]string{"version": "v2"},
},
Spec: k8sapi.PipelineVersionSpec{
PipelineName: "test-pipeline",
PipelineSpec: k8sapi.PipelineIRSpec{
Value: json.RawMessage(validPipelineSpecJSON),
},
},
}
_, err := pipelineWebhook.ValidateUpdate(context.TODO(), oldPipelineVersion, newPipelineVersion)
assert.NoError(t, err, "Expected no error for metadata-only change")
}

View File

@ -26,13 +26,13 @@ import (
"github.com/kubeflow/pipelines/backend/src/crd/controller/scheduledworkflow/util"
swfclientset "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned"
swfinformers "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/informers/externalversions"
"github.com/kubeflow/pipelines/backend/src/crd/pkg/signals"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/transport"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)
var (
@ -59,7 +59,7 @@ func main() {
flag.Parse()
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
stopCh := signals.SetupSignalHandler().Done()
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {

View File

@ -1,41 +0,0 @@
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signals
import (
"os"
"os/signal"
)
var onlyOneSignalHandler = make(chan struct{})
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler() (stopCh <-chan struct{}) {
close(onlyOneSignalHandler) // panics when called twice
stop := make(chan struct{})
c := make(chan os.Signal, 2)
signal.Notify(c, shutdownSignals...)
go func() {
<-c
close(stop)
<-c
os.Exit(1) // second signal. Exit directly.
}()
return stop
}

View File

@ -1,22 +0,0 @@
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signals
import (
"os"
"syscall"
)
var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}

3
go.mod
View File

@ -46,6 +46,7 @@ require (
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.9.0
github.com/tektoncd/pipeline v0.54.0
go.uber.org/zap v1.27.0
gocloud.dev v0.40.0
golang.org/x/net v0.33.0
golang.org/x/oauth2 v0.22.0
@ -117,6 +118,7 @@ require (
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-openapi/analysis v0.20.1 // indirect
github.com/go-openapi/jsonpointer v0.20.2 // indirect
github.com/go-openapi/jsonreference v0.20.4 // indirect
@ -200,7 +202,6 @@ require (
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/mod v0.17.0 // indirect

View File

@ -35,6 +35,14 @@ rules:
- update
- patch
- delete
- apiGroups:
- pipelines.kubeflow.org
resources:
- pipelines
verbs:
- get
- list
- watch
- apiGroups:
- authorization.k8s.io
resources:

View File

@ -37,6 +37,14 @@ rules:
- update
- patch
- delete
- apiGroups:
- pipelines.kubeflow.org
resources:
- pipelines
verbs:
- get
- list
- watch
- apiGroups:
- authorization.k8s.io
resources:

View File

@ -0,0 +1,9 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: kubeflow
resources:
- pipelineversion-validating-webhook-config.yaml
configurations:
- params.yaml

View File

@ -0,0 +1,7 @@
varReference:
- path: metadata/annotations
kind: ValidatingWebhookConfiguration
- path: webhooks/clientConfig/service/namespace
kind: ValidatingWebhookConfiguration
- path: webhooks/name
kind: ValidatingWebhookConfiguration

View File

@ -0,0 +1,27 @@
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: pipelineversions.pipelines.kubeflow.org
webhooks:
- name: pipelineversions.pipelines.kubeflow.org
rules:
- operations:
- CREATE
- UPDATE
apiGroups:
- pipelines.kubeflow.org
apiVersions:
- v2beta1
resources:
- pipelineversions
scope: Namespaced
admissionReviewVersions:
- v1
sideEffects: None
failurePolicy: Fail
clientConfig:
service:
name: ml-pipeline
namespace: $(kfp-namespace)
path: /webhooks/validate-pipelineversion
port: 8443

View File

@ -0,0 +1,6 @@
apiVersion: cert-manager.io/v1
kind: Issuer
metadata:
name: kfp-api-webhook-selfsigned-issuer
spec:
selfSigned: {}

View File

@ -0,0 +1,15 @@
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: kfp-api-webhook-cert
spec:
commonName: kfp-api-webhook-cert
isCA: true
dnsNames:
- ml-pipeline
- ml-pipeline.$(kfp-namespace)
- ml-pipeline.$(kfp-namespace).svc
issuerRef:
kind: Issuer
name: kfp-api-webhook-selfsigned-issuer
secretName: kfp-api-webhook-cert

View File

@ -0,0 +1,13 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- ./kfp-api-cert-issuer.yaml
- ./kfp-api-cert.yaml
configurations:
- params.yaml
# !!! If you want to customize the namespace,
# please also update base/cache-deployer/cluster-scoped/cache-deployer-clusterrolebinding.yaml
namespace: kubeflow

View File

@ -0,0 +1,7 @@
varReference:
- path: spec/commonName
kind: Certificate
- path: spec/dnsNames
kind: Certificate
- path: spec/issuerRef/name
kind: Certificate

View File

@ -0,0 +1,26 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- ../../../base/crds
- ../../platform-agnostic
- ../../../base/webhook
- ../base-webhook-certs/
# !!! If you want to customize the namespace,
# please also update base/cache-deployer/cluster-scoped/cache-deployer-clusterrolebinding.yaml
namespace: kubeflow
patches:
- path: patches/deployment.yaml
target:
kind: Deployment
name: ml-pipeline
- path: patches/service.yaml
target:
kind: Service
name: ml-pipeline
- path: patches/validating-webhook.yaml
target:
kind: ValidatingWebhookConfiguration
name: pipelineversions.pipelines.kubeflow.org

View File

@ -0,0 +1,28 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-pipeline
spec:
template:
spec:
containers:
- name: ml-pipeline-api-server
ports:
- containerPort: 8443
name: webhook
command:
- "/bin/apiserver"
args:
- "--config=/config"
- "--sampleconfig=/config/sample_config.json"
- "-logtostderr=true"
- "--webhookTLSCertPath=/etc/webhook/certs/tls.crt"
- "--webhookTLSKeyPath=/etc/webhook/certs/tls.key"
volumeMounts:
- name: webhook-certs
mountPath: /etc/webhook/certs
readOnly: true
volumes:
- name: webhook-certs
secret:
secretName: kfp-api-webhook-cert

View File

@ -0,0 +1,10 @@
apiVersion: v1
kind: Service
metadata:
name: ml-pipeline
spec:
ports:
- name: webhook
port: 8443
protocol: TCP
targetPort: 8443

View File

@ -0,0 +1,6 @@
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: pipelineversions.pipelines.kubeflow.org
annotations:
cert-manager.io/inject-ca-from: $(kfp-namespace)/kfp-api-webhook-cert

View File

@ -0,0 +1,26 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- ../../../base/crds
- ../platform-agnostic-multi-user
- ../../../base/webhook
- ../base-webhook-certs/
# !!! If you want to customize the namespace,
# please also update base/cache-deployer/cluster-scoped/cache-deployer-clusterrolebinding.yaml
namespace: kubeflow
patches:
- path: patches/deployment.yaml
target:
kind: Deployment
name: ml-pipeline
- path: patches/service.yaml
target:
kind: Service
name: ml-pipeline
- path: patches/validating-webhook.yaml
target:
kind: ValidatingWebhookConfiguration
name: pipelineversions.pipelines.kubeflow.org

View File

@ -0,0 +1,29 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-pipeline
spec:
template:
spec:
containers:
- name: ml-pipeline-api-server
ports:
- containerPort: 8443
name: webhook
image: domain.local/apiserver:local
command:
- "/bin/apiserver"
args:
- "--config=/config"
- "--sampleconfig=/config/sample_config.json"
- "-logtostderr=true"
- "--webhookTLSCertPath=/etc/webhook/certs/tls.crt"
- "--webhookTLSKeyPath=/etc/webhook/certs/tls.key"
volumeMounts:
- name: webhook-certs
mountPath: /etc/webhook/certs
readOnly: true
volumes:
- name: webhook-certs
secret:
secretName: kfp-api-webhook-cert

View File

@ -0,0 +1,10 @@
apiVersion: v1
kind: Service
metadata:
name: ml-pipeline
spec:
ports:
- name: webhook
port: 8443
protocol: TCP
targetPort: 8443

View File

@ -0,0 +1,6 @@
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: pipelineversions.pipelines.kubeflow.org
annotations:
cert-manager.io/inject-ca-from: $(kfp-namespace)/kfp-api-webhook-cert

View File

@ -15,3 +15,7 @@ subsets:
appProtocol: http
port: 8888
protocol: TCP
- name: webhook
appProtocol: http
port: 8443
protocol: TCP

View File

@ -2,9 +2,8 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- ../../base/application
- ../cert-manager/platform-agnostic-k8s-native
- ./forward-local-api-endpoint.yaml
- ../platform-agnostic
# !!! If you want to customize the namespace,
# please refer sample/cluster-scoped-resources to update the namespace for cluster-scoped-resources
@ -126,5 +125,32 @@ patches:
port: 8887
protocol: TCP
targetPort: 8887
- name: webhook
port: 8443
protocol: TCP
targetPort: 8443
selector:
$patch: delete
- patch: |-
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: pipelineversions.pipelines.kubeflow.org
webhooks:
- name: pipelineversions.pipelines.kubeflow.org
clientConfig:
service:
name: ml-pipeline-reverse-proxy
- patch: |-
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: kfp-api-webhook-cert
spec:
dnsNames:
- ml-pipeline
- ml-pipeline.$(kfp-namespace)
- ml-pipeline.$(kfp-namespace).svc
- ml-pipeline-reverse-proxy
- ml-pipeline-reverse-proxy.$(kfp-namespace)
- ml-pipeline-reverse-proxy.$(kfp-namespace).svc

View File

@ -0,0 +1,10 @@
# This Dockerfile creates a containerized NGINX reverse proxy for handling webhook requests.
# It enables HTTPS communication and forwards requests to the API Server in KIND cluster using a self-signed SSL certificate for secure traffic.
FROM registry.access.redhat.com/ubi9/nginx-124
EXPOSE 8443
ADD webhook-proxy-nginx.conf "${NGINX_CONF_PATH}"
# Run script uses standard ways to run the application
CMD ["nginx", "-g", "daemon off;"]

View File

@ -0,0 +1,27 @@
worker_processes auto;
error_log /var/log/nginx/error.log;
pid /run/nginx.pid;
# Load dynamic modules. See /usr/share/doc/nginx/README.dynamic.
include /usr/share/nginx/modules/*.conf;
events {
worker_connections 1024;
}
http {
server {
listen 8443 ssl;
server_name localhost;
ssl_certificate /tmp/k8s-webhook-server/serving-certs/tls.crt;
ssl_certificate_key /tmp/k8s-webhook-server/serving-certs/tls.key;
location / {
proxy_pass http://ml-pipeline:8443;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
}

View File

@ -0,0 +1,46 @@
apiVersion: v1
kind: Service
metadata:
name: ml-pipeline-reverse-proxy
namespace: kubeflow
spec:
ports:
- name: https
port: 8443
protocol: TCP
targetPort: 8443
selector:
app: reverse-webhook
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: reverse-webhook
namespace: kubeflow
spec:
replicas: 1
selector:
matchLabels:
app: reverse-webhook
template:
metadata:
labels:
app: reverse-webhook
spec:
containers:
- name: nginx
image: domain.local/kfp/webhook-proxy:latest
imagePullPolicy: Never
ports:
- name: https
protocol: TCP
containerPort: 8443
volumeMounts:
- mountPath: /tmp/k8s-webhook-server/serving-certs
name: cert
readOnly: true
volumes:
- name: cert
secret:
defaultMode: 420
secretName: kfp-api-webhook-cert