feat(backend): Add support for importing models stored in the Modelcar format (sidecar) (#11606)

This allows dsl.import to leverage Modelcar container images in an OCI
repository. This works by having an init container prepull the image and
then adding a sidecar container when the launcher container is running.
The Modelcar container adds a symlink to its /models directory in an
emptyDir volume that is accessible by the launcher container. Once the
launcher is done running the user code, it stops the Modelcar
containers.

This approach has the benefit of leveraging image pull secrets
configured on the Kubernetes cluster rather than require separate
credentials for importing the artifact. Additionally, no data is copied
to the emptyDir volume, so the storage cost is just pulling the Modelcar
container image on the Kubernetes worker node.

Note that once Kubernetes supports OCI images as volume mounts for
several releases, consider replacing the init container with that
approach.

This also adds a new environment variable of PIPELINE_RUN_AS_USER to
set the runAsUser on all Pods created by Argo Workflows.

Resolves:
https://github.com/kubeflow/pipelines/issues/11584

Signed-off-by: mprahl <mprahl@users.noreply.github.com>
This commit is contained in:
Matt Prahl 2025-02-19 14:02:42 -05:00 committed by GitHub
parent c0778ba88c
commit cc1c435f1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 789 additions and 10 deletions

View File

@ -36,11 +36,18 @@ jobs:
with:
k8s_version: ${{ matrix.k8s_version }}
- name: Build and upload the sample Modelcar image to Kind
run: |
docker build -f samples/v2/modelcar_import/Dockerfile -t registry.domain.local/modelcar:test .
kind --name kfp load docker-image registry.domain.local/modelcar:test
- name: Forward API port
run: ./.github/resources/scripts/forward-port.sh "kubeflow" "ml-pipeline" 8888 8888
- name: Run Samples Tests
id: tests
env:
PULL_NUMBER: ${{ github.event.pull_request.number }}
run: |
./backend/src/v2/test/sample-test.sh
continue-on-error: true

View File

@ -120,6 +120,12 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
Entrypoint: tmplEntrypoint,
},
}
runAsUser := GetPipelineRunAsUser()
if runAsUser != nil {
wf.Spec.SecurityContext = &k8score.PodSecurityContext{RunAsUser: runAsUser}
}
c := &workflowCompiler{
wf: wf,
templates: make(map[string]*wfapi.Template),

View File

@ -36,6 +36,7 @@ func Test_argo_compiler(t *testing.T) {
jobPath string // path of input PipelineJob to compile
platformSpecPath string // path of possible input PlatformSpec to compile
argoYAMLPath string // path of expected output argo workflow YAML
envVars map[string]string
}{
{
jobPath: "../testdata/hello_world.json",
@ -67,9 +68,33 @@ func Test_argo_compiler(t *testing.T) {
platformSpecPath: "",
argoYAMLPath: "testdata/exit_handler.yaml",
},
{
jobPath: "../testdata/hello_world.json",
platformSpecPath: "",
argoYAMLPath: "testdata/hello_world_run_as_user.yaml",
envVars: map[string]string{"PIPELINE_RUN_AS_USER": "1001"},
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {
prevEnvVars := map[string]string{}
for envVarName, envVarValue := range tt.envVars {
prevEnvVars[envVarName] = os.Getenv(envVarName)
os.Setenv(envVarName, envVarValue)
}
defer func() {
for envVarName, envVarValue := range prevEnvVars {
if envVarValue == "" {
os.Unsetenv(envVarName)
} else {
os.Setenv(envVarName, envVarValue)
}
}
}()
job, platformSpec := load(t, tt.jobPath, tt.platformSpecPath)
if *update {
wf, err := argocompiler.Compile(job, platformSpec, nil)

View File

@ -17,9 +17,11 @@ package argocompiler
import (
"fmt"
"os"
"strconv"
"strings"
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/golang/glog"
"github.com/golang/protobuf/jsonpb"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/component"
@ -36,6 +38,7 @@ const (
DriverImageEnvVar = "V2_DRIVER_IMAGE"
DefaultDriverCommand = "driver"
DriverCommandEnvVar = "V2_DRIVER_COMMAND"
PipelineRunAsUserEnvVar = "PIPELINE_RUN_AS_USER"
gcsScratchLocation = "/gcs"
gcsScratchName = "gcs-scratch"
s3ScratchLocation = "/s3"
@ -101,6 +104,25 @@ func GetDriverCommand() []string {
return strings.Split(driverCommand, " ")
}
func GetPipelineRunAsUser() *int64 {
runAsUserStr := os.Getenv(PipelineRunAsUserEnvVar)
if runAsUserStr == "" {
return nil
}
runAsUser, err := strconv.ParseInt(runAsUserStr, 10, 64)
if err != nil {
glog.Error(
"Failed to parse the %s environment variable with value %s as an int64: %v",
PipelineRunAsUserEnvVar, runAsUserStr, err,
)
return nil
}
return &runAsUser
}
func (c *workflowCompiler) containerDriverTask(name string, inputs containerDriverInputs) (*wfapi.DAGTask, *containerDriverOutputs) {
dagTask := &wfapi.DAGTask{
Name: name,

View File

@ -0,0 +1,311 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
creationTimestamp: null
generateName: hello-world-
spec:
arguments:
parameters:
- name: components-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7
value: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}'
- name: implementations-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7
value: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf
\"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def
hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser
= argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\",
dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
= vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.9"}'
- name: components-root
value: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}'
entrypoint: entrypoint
podMetadata:
annotations:
pipelines.kubeflow.org/v2_component: "true"
labels:
pipelines.kubeflow.org/v2_component: "true"
serviceAccountName: pipeline-runner
securityContext:
runAsUser: 1001
templates:
- container:
args:
- --type
- CONTAINER
- --pipeline_name
- namespace/n1/pipeline/hello-world
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
- '{{inputs.parameters.component}}'
- --task
- '{{inputs.parameters.task}}'
- --container
- '{{inputs.parameters.container}}'
- --iteration_index
- '{{inputs.parameters.iteration-index}}'
- --cached_decision_path
- '{{outputs.parameters.cached-decision.path}}'
- --pod_spec_patch_path
- '{{outputs.parameters.pod-spec-patch.path}}'
- --condition_path
- '{{outputs.parameters.condition.path}}'
- --kubernetes_config
- '{{inputs.parameters.kubernetes-config}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
name: ""
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 100m
memory: 64Mi
inputs:
parameters:
- name: component
- name: task
- name: container
- name: parent-dag-id
- default: "-1"
name: iteration-index
- default: ""
name: kubernetes-config
metadata: {}
name: system-container-driver
outputs:
parameters:
- name: pod-spec-patch
valueFrom:
default: ""
path: /tmp/outputs/pod-spec-patch
- default: "false"
name: cached-decision
valueFrom:
default: "false"
path: /tmp/outputs/cached-decision
- name: condition
valueFrom:
default: "true"
path: /tmp/outputs/condition
- dag:
tasks:
- arguments:
parameters:
- name: pod-spec-patch
value: '{{inputs.parameters.pod-spec-patch}}'
name: executor
template: system-container-impl
when: '{{inputs.parameters.cached-decision}} != true'
inputs:
parameters:
- name: pod-spec-patch
- default: "false"
name: cached-decision
metadata: {}
name: system-container-executor
outputs: {}
- container:
command:
- should-be-overridden-during-runtime
env:
- name: KFP_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KFP_POD_UID
valueFrom:
fieldRef:
fieldPath: metadata.uid
envFrom:
- configMapRef:
name: metadata-grpc-configmap
optional: true
image: gcr.io/ml-pipeline/should-be-overridden-during-runtime
name: ""
resources: {}
volumeMounts:
- mountPath: /kfp-launcher
name: kfp-launcher
- mountPath: /gcs
name: gcs-scratch
- mountPath: /s3
name: s3-scratch
- mountPath: /minio
name: minio-scratch
- mountPath: /.local
name: dot-local-scratch
- mountPath: /.cache
name: dot-cache-scratch
- mountPath: /.config
name: dot-config-scratch
initContainers:
- command:
- launcher-v2
- --copy
- /kfp-launcher/launch
image: gcr.io/ml-pipeline/kfp-launcher
name: kfp-launcher
resources:
limits:
cpu: 500m
memory: 128Mi
requests:
cpu: 100m
volumeMounts:
- mountPath: /kfp-launcher
name: kfp-launcher
inputs:
parameters:
- name: pod-spec-patch
metadata: {}
name: system-container-impl
outputs: {}
podSpecPatch: '{{inputs.parameters.pod-spec-patch}}'
volumes:
- emptyDir: {}
name: kfp-launcher
- emptyDir: {}
name: gcs-scratch
- emptyDir: {}
name: s3-scratch
- emptyDir: {}
name: minio-scratch
- emptyDir: {}
name: dot-local-scratch
- emptyDir: {}
name: dot-cache-scratch
- emptyDir: {}
name: dot-config-scratch
- dag:
tasks:
- arguments:
parameters:
- name: component
value: '{{workflow.parameters.components-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7}}'
- name: task
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}'
- name: container
value: '{{workflow.parameters.implementations-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
name: hello-world-driver
template: system-container-driver
- arguments:
parameters:
- name: pod-spec-patch
value: '{{tasks.hello-world-driver.outputs.parameters.pod-spec-patch}}'
- default: "false"
name: cached-decision
value: '{{tasks.hello-world-driver.outputs.parameters.cached-decision}}'
depends: hello-world-driver.Succeeded
name: hello-world
template: system-container-executor
inputs:
parameters:
- name: parent-dag-id
metadata: {}
name: root
outputs: {}
- container:
args:
- --type
- '{{inputs.parameters.driver-type}}'
- --pipeline_name
- namespace/n1/pipeline/hello-world
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
- '{{inputs.parameters.component}}'
- --task
- '{{inputs.parameters.task}}'
- --runtime_config
- '{{inputs.parameters.runtime-config}}'
- --iteration_index
- '{{inputs.parameters.iteration-index}}'
- --execution_id_path
- '{{outputs.parameters.execution-id.path}}'
- --iteration_count_path
- '{{outputs.parameters.iteration-count.path}}'
- --condition_path
- '{{outputs.parameters.condition.path}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
name: ""
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 100m
memory: 64Mi
inputs:
parameters:
- name: component
- default: ""
name: runtime-config
- default: ""
name: task
- default: "0"
name: parent-dag-id
- default: "-1"
name: iteration-index
- default: DAG
name: driver-type
metadata: {}
name: system-dag-driver
outputs:
parameters:
- name: execution-id
valueFrom:
path: /tmp/outputs/execution-id
- name: iteration-count
valueFrom:
default: "0"
path: /tmp/outputs/iteration-count
- name: condition
valueFrom:
default: "true"
path: /tmp/outputs/condition
- dag:
tasks:
- arguments:
parameters:
- name: component
value: '{{workflow.parameters.components-root}}'
- name: runtime-config
value: '{"parameters":{"text":{"stringValue":"hi there"}}}'
- name: driver-type
value: ROOT_DAG
name: root-driver
template: system-dag-driver
- arguments:
parameters:
- name: parent-dag-id
value: '{{tasks.root-driver.outputs.parameters.execution-id}}'
- name: condition
value: ""
depends: root-driver.Succeeded
name: root
template: root
inputs: {}
metadata: {}
name: entrypoint
outputs: {}
status:
finishedAt: null
startedAt: null

View File

@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/kubeflow/pipelines/backend/src/v2/objectstore"
pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata"
@ -227,10 +229,6 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
state := pb.Artifact_LIVE
provider, err := objectstore.ParseProviderFromPath(artifactUri)
if err != nil {
return nil, fmt.Errorf("No Provider scheme found in artifact Uri: %s", artifactUri)
}
artifact = &pb.Artifact{
TypeId: &artifactTypeId,
State: &state,
@ -248,6 +246,24 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
}
}
if strings.HasPrefix(artifactUri, "oci://") {
artifactType, err := metadata.SchemaToArtifactType(schema)
if err != nil {
return nil, fmt.Errorf("converting schema to artifact type failed: %w", err)
}
if *artifactType.Name != "system.Model" {
return nil, fmt.Errorf("the %s artifact type does not support OCI registries", *artifactType.Name)
}
return artifact, nil
}
provider, err := objectstore.ParseProviderFromPath(artifactUri)
if err != nil {
return nil, fmt.Errorf("no provider scheme found in artifact URI: %s", artifactUri)
}
// Assume all imported artifacts will rely on execution environment for store provider session info
storeSessionInfo := objectstore.SessionInfo{
Provider: provider,

View File

@ -127,12 +127,51 @@ func NewLauncherV2(ctx context.Context, executionID int64, executorInputJSON, co
}, nil
}
// stopWaitingArtifacts will create empty files to tell Modelcar sidecar containers to stop. Any errors encountered are
// logged since this is meant as a deferred function at the end of the launcher's execution.
func stopWaitingArtifacts(artifacts map[string]*pipelinespec.ArtifactList) {
for _, artifactList := range artifacts {
if len(artifactList.Artifacts) == 0 {
continue
}
// Following the convention of downloadArtifacts in the launcher to only look at the first in the list.
inputArtifact := artifactList.Artifacts[0]
// This should ideally verify that this is also a model input artifact, but this metadata doesn't seem to
// be set on inputArtifact.
if !strings.HasPrefix(inputArtifact.Uri, "oci://") {
continue
}
localPath, err := LocalPathForURI(inputArtifact.Uri)
if err != nil {
continue
}
glog.Infof("Stopping Modelcar container for artifact %s", inputArtifact.Uri)
launcherCompleteFile := strings.TrimSuffix(localPath, "/models") + "/launcher-complete"
_, err = os.Create(launcherCompleteFile)
if err != nil {
glog.Errorf(
"Failed to stop the artifact %s by creating %s: %v", inputArtifact.Uri, launcherCompleteFile, err,
)
continue
}
}
}
func (l *LauncherV2) Execute(ctx context.Context) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("failed to execute component: %w", err)
}
}()
defer stopWaitingArtifacts(l.executorInput.GetInputs().GetArtifacts())
// publish execution regardless the task succeeds or not
var execution *metadata.Execution
var executorOutput *pipelinespec.ExecutorOutput
@ -401,6 +440,7 @@ func execute(
if err := downloadArtifacts(ctx, executorInput, bucket, bucketConfig, namespace, k8sClient); err != nil {
return nil, err
}
if err := prepareOutputFolders(executorInput); err != nil {
return nil, err
}
@ -441,7 +481,7 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
}
// Upload artifacts from local path to remote storages.
localDir, err := localPathForURI(outputArtifact.Uri)
localDir, err := LocalPathForURI(outputArtifact.Uri)
if err != nil {
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri)
} else {
@ -477,6 +517,31 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
return outputArtifacts, nil
}
// waitForModelcar assumes the Modelcar has already been validated by the init container on the launcher
// pod. This waits for the Modelcar as a sidecar container to be ready.
func waitForModelcar(artifactURI string, localPath string) error {
glog.Infof("Waiting for the Modelcar %s to be available", artifactURI)
for {
_, err := os.Stat(localPath)
if err == nil {
glog.Infof("The Modelcar is now available at %s", localPath)
return nil
}
if !os.IsNotExist(err) {
return fmt.Errorf(
"failed to see if the artifact %s was ready at %s; ensure the main container and Modelcar "+
"container have the same UID (can be set with the PIPELINE_RUN_AS_USER environment variable on "+
"the API server): %v",
artifactURI, localPath, err)
}
time.Sleep(500 * time.Millisecond)
}
}
func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.ExecutorInput, defaultBucket *blob.Bucket, defaultBucketConfig *objectstore.Config, namespace string, k8sClient kubernetes.Interface) error {
// Read input artifact metadata.
nonDefaultBuckets, err := fetchNonDefaultBuckets(ctx, executorInput.GetInputs().GetArtifacts(), defaultBucketConfig, namespace, k8sClient)
@ -491,17 +556,31 @@ func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.Executor
if err != nil {
return fmt.Errorf("failed to fetch non default buckets: %w", err)
}
for name, artifactList := range executorInput.GetInputs().GetArtifacts() {
// TODO(neuromage): Support concat-based placholders for arguments.
if len(artifactList.Artifacts) == 0 {
continue
}
inputArtifact := artifactList.Artifacts[0]
localPath, err := localPathForURI(inputArtifact.Uri)
localPath, err := LocalPathForURI(inputArtifact.Uri)
if err != nil {
glog.Warningf("Input Artifact %q does not have a recognized storage URI %q. Skipping downloading to local path.", name, inputArtifact.Uri)
continue
}
// OCI artifacts are accessed via shared storage of a Modelcar
if strings.HasPrefix(inputArtifact.Uri, "oci://") {
err := waitForModelcar(inputArtifact.Uri, localPath)
if err != nil {
return err
}
continue
}
// Copy artifact to local storage.
copyErr := func(err error) error {
return fmt.Errorf("failed to download input artifact %q from remote storage URI %q: %w", name, inputArtifact.Uri, err)
@ -548,6 +627,12 @@ func fetchNonDefaultBuckets(
}
// TODO: Support multiple artifacts someday, probably through the v2 engine.
artifact := artifactList.Artifacts[0]
// OCI artifacts are accessed via shared storage of a Modelcar
if strings.HasPrefix(artifact.Uri, "oci://") {
continue
}
// The artifact does not belong under the object store path for this run. Cases:
// 1. Artifact is cached from a different run, so it may still be in the default bucket, but under a different run id subpath
// 2. Artifact is imported from the same bucket, but from a different path (re-use the same session)
@ -598,7 +683,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
key := fmt.Sprintf(`{{$.inputs.artifacts['%s'].uri}}`, name)
placeholders[key] = inputArtifact.Uri
localPath, err := localPathForURI(inputArtifact.Uri)
localPath, err := LocalPathForURI(inputArtifact.Uri)
if err != nil {
// Input Artifact does not have a recognized storage URI
continue
@ -617,7 +702,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
outputArtifact := artifactList.Artifacts[0]
placeholders[fmt.Sprintf(`{{$.outputs.artifacts['%s'].uri}}`, name)] = outputArtifact.Uri
localPath, err := localPathForURI(outputArtifact.Uri)
localPath, err := LocalPathForURI(outputArtifact.Uri)
if err != nil {
return nil, fmt.Errorf("resolve output artifact %q's local path: %w", name, err)
}
@ -720,7 +805,7 @@ func getExecutorOutputFile(path string) (*pipelinespec.ExecutorOutput, error) {
return executorOutput, nil
}
func localPathForURI(uri string) (string, error) {
func LocalPathForURI(uri string) (string, error) {
if strings.HasPrefix(uri, "gs://") {
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
}
@ -730,6 +815,9 @@ func localPathForURI(uri string) (string, error) {
if strings.HasPrefix(uri, "s3://") {
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
}
if strings.HasPrefix(uri, "oci://") {
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "\\/") + "/models", nil
}
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
}
@ -747,7 +835,7 @@ func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
}
outputArtifact := artifactList.Artifacts[0]
localPath, err := localPathForURI(outputArtifact.Uri)
localPath, err := LocalPathForURI(outputArtifact.Uri)
if err != nil {
return fmt.Errorf("failed to generate local storage path for output artifact %q: %w", name, err)
}

View File

@ -18,7 +18,9 @@ import (
"context"
"encoding/json"
"fmt"
"slices"
"strconv"
"strings"
"time"
"github.com/kubeflow/pipelines/backend/src/common/util"
@ -549,9 +551,137 @@ func initPodSpecPatch(
Env: userEnvVar,
}},
}
addModelcarsToPodSpec(executorInput.GetInputs().GetArtifacts(), userEnvVar, podSpec)
return podSpec, nil
}
// addModelcarsToPodSpec will patch the pod spec if there are any input artifacts in the Modelcar format.
// Much of this logic is based on KServe:
// https://github.com/kserve/kserve/blob/v0.14.1/pkg/webhook/admission/pod/storage_initializer_injector.go#L131
func addModelcarsToPodSpec(
artifacts map[string]*pipelinespec.ArtifactList,
userEnvVar []k8score.EnvVar,
podSpec *k8score.PodSpec,
) {
// We need to add Modelcar containers and volumes in a deterministic order so that we can have stable naming of
// containers and volumes. The approach taken is sorting by input artifact name and then leveraging the index
// as a suffix to Modelcar containers and volumes added to the pod spec. The artifact name cannot be directly used
// as it may not be a compatible Kubernetes object name.
modelcarArtifacts := map[string]*pipelinespec.RuntimeArtifact{}
modelcarArtifactNames := []string{}
for name, artifactList := range artifacts {
if len(artifactList.Artifacts) == 0 {
continue
}
// Following the convention of downloadArtifacts in the launcher to only look at the first in the list.
inputArtifact := artifactList.Artifacts[0]
// This should ideally verify that this is also a model input artifact, but this metadata doesn't seem to
// be set on inputArtifact.
if !strings.HasPrefix(inputArtifact.Uri, "oci://") {
continue
}
modelcarArtifacts[name] = inputArtifact
modelcarArtifactNames = append(modelcarArtifactNames, name)
}
slices.Sort(modelcarArtifactNames)
for i, name := range modelcarArtifactNames {
inputArtifact := modelcarArtifacts[name]
localPath, err := component.LocalPathForURI(inputArtifact.Uri)
if err != nil {
continue
}
// If there is at least one Modelcar image, then shareProcessNamespace must be enabled.
trueVal := true
podSpec.ShareProcessNamespace = &trueVal
image := strings.TrimPrefix(inputArtifact.Uri, "oci://")
podSpec.InitContainers = append(
podSpec.InitContainers,
k8score.Container{
Name: fmt.Sprintf("oci-prepull-%d", i),
Image: image,
Command: []string{
"sh",
"-c",
// Check that the expected models directory exists
// Taken from KServe:
// https://github.com/kserve/kserve/blob/v0.14.1/pkg/webhook/admission/pod/storage_initializer_injector.go#L732
"echo 'Pre-fetching modelcar " + image + ": ' && [ -d /models ] && " +
"[ \"$$(ls -A /models)\" ] && echo 'OK ... Prefetched and valid (/models exists)' || " +
"(echo 'NOK ... Prefetched but modelcar is invalid (/models does not exist or is empty)' && " +
" exit 1)",
},
Env: userEnvVar,
TerminationMessagePolicy: k8score.TerminationMessageFallbackToLogsOnError,
},
)
volumeName := fmt.Sprintf("oci-%d", i)
podSpec.Volumes = append(
podSpec.Volumes,
k8score.Volume{
Name: volumeName,
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{},
},
},
)
mountPath := strings.TrimSuffix(localPath, "/models")
emptyDirVolumeMount := k8score.VolumeMount{
Name: volumeName,
MountPath: mountPath,
SubPath: strings.TrimPrefix(mountPath, "/oci/"),
}
podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, emptyDirVolumeMount)
podSpec.Containers = append(
podSpec.Containers,
k8score.Container{
Name: fmt.Sprintf("oci-%d", i),
Image: image,
ImagePullPolicy: "IfNotPresent",
Env: userEnvVar,
VolumeMounts: []k8score.VolumeMount{emptyDirVolumeMount},
Command: []string{
"sh",
"-c",
// $$$$ gets escaped by YAML to $$, which is the current PID
// This container will sleep until the main container finishes execution and
// communicates its exit via a file creation, at which point this container
// will then also exit.
// This approach is taken instead of having the main container send a SIGHUP to the
// sleep process to avoid the need for the SYS_PTRACE capability which is not always available
// depending on the security context restrictions.
// This approach is inspired by KServe:
// https://github.com/kserve/kserve/blob/v0.14.1/pkg/webhook/admission/pod/storage_initializer_injector.go#L732
fmt.Sprintf(
"ln -s /proc/$$$$/root/models \"%s\" && "+
"echo \"Running Modelcar container...\" && "+
"until [ -f \"%s/launcher-complete\" ]; do sleep 1; done",
localPath, mountPath,
),
},
TerminationMessagePolicy: k8score.TerminationMessageFallbackToLogsOnError,
},
)
}
}
// Extends the PodSpec to include Kubernetes-specific executor config.
func extendPodSpecPatch(
podSpec *k8score.PodSpec,

View File

@ -391,6 +391,54 @@ func Test_initPodSpecPatch_legacy_resources(t *testing.T) {
assert.Equal(t, k8sres.MustParse("1"), res.Limits[k8score.ResourceName("nvidia.com/gpu")])
}
func Test_initPodSpecPatch_modelcar_input_artifact(t *testing.T) {
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{
Image: "python:3.9",
Args: []string{"--function_to_execute", "add"},
Command: []string{"sh", "-ec", "python3 -m kfp.components.executor_main"},
}
componentSpec := &pipelinespec.ComponentSpec{}
executorInput := &pipelinespec.ExecutorInput{
Inputs: &pipelinespec.ExecutorInput_Inputs{
Artifacts: map[string]*pipelinespec.ArtifactList{
"my-model": {
Artifacts: []*pipelinespec.RuntimeArtifact{
{
Uri: "oci://registry.domain.local/my-model:latest",
},
},
},
},
},
}
podSpec, err := initPodSpecPatch(
containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300",
)
assert.Nil(t, err)
assert.Len(t, podSpec.InitContainers, 1)
assert.Equal(t, podSpec.InitContainers[0].Name, "oci-prepull-0")
assert.Equal(t, podSpec.InitContainers[0].Image, "registry.domain.local/my-model:latest")
assert.Len(t, podSpec.Volumes, 1)
assert.Equal(t, podSpec.Volumes[0].Name, "oci-0")
assert.NotNil(t, podSpec.Volumes[0].EmptyDir)
assert.Len(t, podSpec.Containers, 2)
assert.Len(t, podSpec.Containers[0].VolumeMounts, 1)
assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].Name, "oci-0")
assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].MountPath, "/oci/registry.domain.local\\/my-model:latest")
assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].SubPath, "registry.domain.local\\/my-model:latest")
assert.Equal(t, podSpec.Containers[1].Name, "oci-0")
assert.Equal(t, podSpec.Containers[1].Image, "registry.domain.local/my-model:latest")
assert.Len(t, podSpec.Containers[1].VolumeMounts, 1)
assert.Equal(t, podSpec.Containers[1].VolumeMounts[0].Name, "oci-0")
assert.Equal(t, podSpec.Containers[1].VolumeMounts[0].MountPath, "/oci/registry.domain.local\\/my-model:latest")
assert.Equal(t, podSpec.Containers[1].VolumeMounts[0].SubPath, "registry.domain.local\\/my-model:latest")
}
func Test_makeVolumeMountPatch(t *testing.T) {
type args struct {
pvcMount []*kubernetesplatform.PvcMount

View File

@ -23,6 +23,14 @@ python3 -m pip install -r ./requirements-sample-test.txt
popd
if [[ -n "${PULL_NUMBER}" ]]; then
export KFP_PACKAGE_PATH="git+https://github.com/kubeflow/pipelines@refs/pull/${PULL_NUMBER}/merge#egg=kfp&subdirectory=sdk/python"
else
export KFP_PACKAGE_PATH='git+https://github.com/kubeflow/pipelines#egg=kfp&subdirectory=sdk/python'
fi
python3 -m pip install $KFP_PACKAGE_PATH
# The -u flag makes python output unbuffered, so that we can see real time log.
# Reference: https://stackoverflow.com/a/107717
python3 -u ./samples/v2/sample_test.py

View File

@ -0,0 +1,16 @@
FROM python:3.13-slim as base
USER 0
RUN pip install huggingface-hub
# Download a small model file from Hugging Face
RUN python -c "from huggingface_hub import snapshot_download; snapshot_download(repo_id='openai/whisper-tiny', local_dir='/models',allow_patterns=['*.safetensors', '*.json', '*.txt'], revision='169d4a4341b33bc18d8881c4b69c2e104e1cc0af')"
# Final image containing only the essential model files
FROM alpine:3.19
RUN mkdir /models
# Copy the model files from the base container
COPY --from=base /models /models

View File

@ -0,0 +1,75 @@
#!/usr/bin/env python3
# Copyright 2025 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline that imports a model in the Modelcar format from an OCI registry."""
import os
from kfp import compiler
from kfp import dsl
from kfp.dsl import component
# In tests, we install a KFP package from the PR under test. Users should not
# normally need to specify `kfp_package_path` in their component definitions.
_KFP_PACKAGE_PATH = os.getenv("KFP_PACKAGE_PATH")
@dsl.component(kfp_package_path=_KFP_PACKAGE_PATH)
def get_model_files_list(input_model: dsl.Input[dsl.Model]) -> str:
import os
import os.path
if not os.path.exists(input_model.path):
raise RuntimeError(f"The model does not exist at: {input_model.path}")
expected_files = {
"added_tokens.json",
"config.json",
"generation_config.json",
"merges.txt",
"model.safetensors",
"normalizer.json",
"preprocessor_config.json",
"special_tokens_map.json",
"tokenizer.json",
"tokenizer_config.json",
"vocab.json",
}
filesInPath = set(os.listdir(input_model.path))
if not filesInPath.issuperset(expected_files):
raise RuntimeError(
"The model does not have expected files: "
+ ", ".join(sorted(expected_files.difference(filesInPath)))
)
return ", ".join(sorted(filesInPath))
@dsl.pipeline(name="pipeline-with-modelcar-model")
def pipeline_modelcar_import(
model_uri: str = "oci://registry.domain.local/modelcar:test",
):
model_source_oci_task = dsl.importer(
artifact_uri=model_uri, artifact_class=dsl.Model
)
get_model_files_list(input_model=model_source_oci_task.output).set_caching_options(False)
if __name__ == "__main__":
compiler.Compiler().compile(
pipeline_func=pipeline_modelcar_import, package_path=__file__ + ".yaml"
)

View File

@ -30,6 +30,7 @@ import producer_consumer_param
import subdagio
import two_step_pipeline_containerized
import pipeline_with_placeholders
from modelcar_import import modelcar_import
_MINUTE = 60 # seconds
_DEFAULT_TIMEOUT = 5 * _MINUTE
@ -76,6 +77,7 @@ class SampleTest(unittest.TestCase):
TestCase(
pipeline_func=subdagio.multiple_artifacts_namedtuple.crust),
TestCase(pipeline_func=pipeline_with_placeholders.pipeline_with_placeholders),
TestCase(pipeline_func=modelcar_import.pipeline_modelcar_import),
]
with ThreadPoolExecutor() as executor:

View File

@ -20,10 +20,12 @@ import warnings
_GCS_LOCAL_MOUNT_PREFIX = '/gcs/'
_MINIO_LOCAL_MOUNT_PREFIX = '/minio/'
_S3_LOCAL_MOUNT_PREFIX = '/s3/'
_OCI_LOCAL_MOUNT_PREFIX = '/oci/'
GCS_REMOTE_PREFIX = 'gs://'
MINIO_REMOTE_PREFIX = 'minio://'
S3_REMOTE_PREFIX = 's3://'
OCI_REMOTE_PREFIX = 'oci://'
class Artifact:
@ -94,6 +96,10 @@ class Artifact:
):]
elif self.uri.startswith(S3_REMOTE_PREFIX):
return _S3_LOCAL_MOUNT_PREFIX + self.uri[len(S3_REMOTE_PREFIX):]
elif self.uri.startswith(OCI_REMOTE_PREFIX):
escaped_uri = self.uri[len(OCI_REMOTE_PREFIX):].replace('/', '\\/')
return _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
# uri == path for local execution
return self.uri
@ -108,6 +114,13 @@ def convert_local_path_to_remote_path(path: str) -> str:
return MINIO_REMOTE_PREFIX + path[len(_MINIO_LOCAL_MOUNT_PREFIX):]
elif path.startswith(_S3_LOCAL_MOUNT_PREFIX):
return S3_REMOTE_PREFIX + path[len(_S3_LOCAL_MOUNT_PREFIX):]
elif path.startswith(_OCI_LOCAL_MOUNT_PREFIX):
remote_path = path[len(_OCI_LOCAL_MOUNT_PREFIX):].replace('\\/', '/')
if remote_path.endswith("/models"):
remote_path = remote_path[:-len("/models")]
return OCI_REMOTE_PREFIX + remote_path
return path
@ -128,6 +141,15 @@ class Model(Artifact):
def _get_framework(self) -> str:
return self.metadata.get('framework', '')
@property
def path(self) -> str:
if self.uri.startswith("oci://"):
# Modelcar container images are expected to have the model files stored in /models
# https://github.com/kserve/kserve/blob/v0.14.1/pkg/webhook/admission/pod/storage_initializer_injector.go#L732
return self._get_path() + "/models"
return self._get_path()
@framework.setter
def framework(self, framework: str) -> None:
self._set_framework(framework)

View File

@ -138,6 +138,9 @@ class TestConvertLocalPathToRemotePath(parameterized.TestCase):
('/gcs/foo/bar', 'gs://foo/bar'),
('/minio/foo/bar', 'minio://foo/bar'),
('/s3/foo/bar', 's3://foo/bar'),
('/oci/quay.io\\/org\\/repo:latest/models',
'oci://quay.io/org/repo:latest'),
('/oci/quay.io\\/org\\/repo:latest', 'oci://quay.io/org/repo:latest'),
('/tmp/kfp_outputs', '/tmp/kfp_outputs'),
('/some/random/path', '/some/random/path'),
]])