feat(backend): Add support for Pythonic artifacts (#12256)

Resolves:
https://github.com/kubeflow/pipelines/issues/12098

Signed-off-by: mprahl <mprahl@users.noreply.github.com>
This commit is contained in:
Matt Prahl 2025-09-19 07:35:36 -04:00 committed by GitHub
parent 6e6d0641eb
commit 4471829cdb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 64 additions and 2 deletions

View File

@ -16,6 +16,7 @@ package driver
import (
"fmt"
"path/filepath"
"slices"
"strings"
@ -640,7 +641,6 @@ func provisionOutputs(
outputs := &pipelinespec.ExecutorInput_Outputs{
Artifacts: make(map[string]*pipelinespec.ArtifactList),
Parameters: make(map[string]*pipelinespec.ExecutorInput_OutputParameter),
OutputFile: component.OutputMetadataFilepath,
}
artifacts := outputsSpec.GetArtifacts()
@ -660,13 +660,23 @@ func provisionOutputs(
}
}
// Compute a task-root remote URI that will serve as the base for all
// output artifacts and the executor output file. This enables Pythonic
// artifacts (dsl.get_uri) by allowing the SDK to infer the task root from
// the executor output file's directory (set below) and convert it back to
// a remote URI at runtime.
taskRootRemote := metadata.GenerateOutputURI(pipelineRoot, []string{taskName, outputURISalt}, false)
// Set per-artifact output URIs under the task root.
for name, artifact := range artifacts {
outputs.Artifacts[name] = &pipelinespec.ArtifactList{
Artifacts: []*pipelinespec.RuntimeArtifact{
{
// Required by Pythonic artifacts to avoid a key error in the SDK.
Name: name,
// Do not preserve the query string for output artifacts, as otherwise
// they'd appear in file and artifact names.
Uri: metadata.GenerateOutputURI(pipelineRoot, []string{taskName, outputURISalt, name}, false),
Uri: metadata.GenerateOutputURI(taskRootRemote, []string{name}, false),
Type: artifact.GetArtifactType(),
Metadata: artifact.GetMetadata(),
},
@ -680,6 +690,15 @@ func provisionOutputs(
}
}
// Place the executor output file under localTaskRoot to enable Pythonic artifacts. The SDK's pythonic artifact
// runtime derives CONTAINER_TASK_ROOT from the directory of OutputFile to use it in dsl.get_uri.
if localTaskRoot, err := component.LocalPathForURI(taskRootRemote); err == nil {
outputs.OutputFile = filepath.Join(localTaskRoot, "output_metadata.json")
} else {
// Fallback to legacy path if the pipeline root scheme is not recognized.
outputs.OutputFile = component.OutputMetadataFilepath
}
return outputs
}

View File

@ -0,0 +1,41 @@
from kfp import dsl
from kfp import compiler
from kfp.dsl import Dataset, Model
@dsl.component
def gen_data() -> Dataset:
dataset = Dataset(uri=dsl.get_uri())
with open(dataset.path, "w") as f:
f.write("some data")
dataset.metadata["length"] = len("some data")
return dataset
@dsl.component
def train_model(dataset: Dataset) -> Model:
with open(dataset.path) as f:
lines = f.read()
assert lines == "some data"
assert dataset.metadata["length"] == len("some data")
model_artifact = Model(uri=dsl.get_uri("model"))
with open(model_artifact.path, "w") as f:
f.write("model trained")
return model_artifact
@dsl.pipeline(name="pythonic-artifacts-test")
def pythonic_artifacts_test_pipeline():
t1 = gen_data().set_caching_options(False)
train_model(dataset=t1.output).set_caching_options(False)
if __name__ == "__main__":
compiler.Compiler().compile(
pipeline_func=pythonic_artifacts_test_pipeline,
package_path=__file__.replace(".py", ".yaml"),
)

View File

@ -80,6 +80,7 @@ import pipeline_with_workspace
from modelcar import modelcar
import pipeline_with_utils
import task_config
import pythonic_artifacts_test_pipeline
_MINUTE = 60 # seconds
@ -257,6 +258,7 @@ class SampleTest(unittest.TestCase):
TestCase(pipeline_func=pipeline_with_workspace.pipeline_with_workspace),
TestCase(pipeline_func=pipeline_with_utils.pipeline_with_utils),
TestCase(pipeline_func=task_config.pipeline_task_config),
TestCase(pipeline_func=pythonic_artifacts_test_pipeline.pythonic_artifacts_test_pipeline),
]
with ThreadPoolExecutor() as executor: