feat(sdk): Introduce experimental v2-compatibility in KFP SDK (#5218)

* WIP: Enable v2 compatibility in KFP SDK compiler.

* First pass clean up

* Clean up and introduce enum instead of boolean for execution mode.

* More cleanup

* Clean up and add comments.

* Undo formatting changes.

* Undo formatting changes.

* Add method to unconditionally add kfp pod env.

* minor formatting change.

* Update docstrings.

* Undo formatting changes.

* fix imports.

* fix pod_env tests

* rebased.

* undo format changes.

* Undo compiler changes.:

* format _default_transformers.py for consistency

* Fix various rebasing issues.

* fix bug referring to pipeline_name/pipeline_root in v1 pipelines.

* revert output dir name.

* allow both types of attributes for pipeline root.

* fix pod env yaml golden.

* fix for input/output uri tests.

* Add v2 compatible compiler test.

* Use ordereddict to fix flaky golden file tests.

* Address PR comments.

* Address PR comments.

* Address PR comments.
This commit is contained in:
Ajay Gopinathan 2021-03-08 15:40:23 -08:00 committed by GitHub
parent a0b57c312b
commit 83eded130c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 685 additions and 113 deletions

View File

@ -25,9 +25,9 @@ import zipfile
import datetime
from typing import Mapping, Callable, Optional
import kfp
import kfp_server_api
from kfp import dsl
from kfp.compiler import compiler
from kfp.compiler._k8s_helper import sanitize_k8s_name
@ -272,7 +272,7 @@ class Client(object):
self._context_setting = {
'namespace': '',
}
def _refresh_api_client_token(self):
"""Refreshes the existing token associated with the kfp_api_client."""
if getattr(self, '_is_refresh_token', None):
@ -283,7 +283,7 @@ class Client(object):
def set_user_namespace(self, namespace):
"""Set user namespace into local context setting file.
This function should only be used when Kubeflow Pipelines is in the multi-user mode.
Args:
@ -408,7 +408,7 @@ class Client(object):
namespace: Kubernetes namespace where the experiment was created.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized.
Returns:
A response object including a list of experiments and next page token.
"""
@ -620,14 +620,14 @@ class Client(object):
pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: A dictionary with key (string) as param name and value (string) as param value.
pipeline_id: The id of a pipeline.
version_id: The id of a pipeline version.
version_id: The id of a pipeline version.
If both pipeline_id and version_id are specified, version_id will take precendence.
If only pipeline_id is specified, the default version of this pipeline is used to create the run.
Returns:
A JobConfig object with attributes spec and resource_reference.
"""
class JobConfig:
def __init__(self, spec, resource_references):
self.spec = spec
@ -666,8 +666,9 @@ class Client(object):
arguments: Mapping[str, str],
run_name: Optional[str] = None,
experiment_name: Optional[str] = None,
pipeline_conf: Optional[kfp.dsl.PipelineConf] = None,
namespace: Optional[str] = None):
pipeline_conf: Optional[dsl.PipelineConf] = None,
namespace: Optional[str] = None,
mode: dsl.PipelineExecutionMode = dsl.PipelineExecutionMode.V1_LEGACY):
"""Runs pipeline on KFP-enabled Kubernetes cluster.
This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution.
@ -682,14 +683,25 @@ class Client(object):
namespace: Kubernetes namespace where the pipeline runs are created.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized
mode: The PipelineExecutionMode to use when compiling and running
pipeline_func.
"""
#TODO: Check arguments against the pipeline function
pipeline_name = pipeline_func.__name__
run_name = run_name or pipeline_name + ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_package_path = os.path.join(tmpdir, 'pipeline.yaml')
compiler.Compiler().compile(pipeline_func, pipeline_package_path, pipeline_conf=pipeline_conf)
return self.create_run_from_pipeline_package(pipeline_package_path, arguments, run_name, experiment_name, namespace)
compiler.Compiler(mode=mode).compile(
pipeline_func=pipeline_func,
package_path=pipeline_package_path,
pipeline_conf=pipeline_conf)
return self.create_run_from_pipeline_package(
pipeline_file=pipeline_package_path,
arguments=arguments,
run_name=run_name,
experiment_name=experiment_name,
namespace=namespace)
def create_run_from_pipeline_package(
self,
@ -837,7 +849,7 @@ class Client(object):
> _GCP_ACCESS_TOKEN_TIMEOUT):
self._refresh_api_client_token()
last_token_refresh_time = datetime.datetime.now()
get_run_response = self._run_api.get_run(run_id=run_id)
status = get_run_response.run.status
elapsed_time = (datetime.datetime.now() - start_time).total_seconds()
@ -912,8 +924,8 @@ class Client(object):
pipeline_id = self.get_pipeline_id(pipeline_name)
response = self._upload_api.upload_pipeline_version(
pipeline_package_path,
name=pipeline_version_name,
pipeline_package_path,
name=pipeline_version_name,
pipelineid=pipeline_id
)

View File

@ -12,30 +12,45 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import warnings
from kubernetes import client as k8s_client
from ..dsl._container_op import BaseOp, ContainerOp
def add_pod_env(op: BaseOp) -> BaseOp:
"""Adds pod environment info to ContainerOp.
"""Adds environment info if the Pod has the label `add-pod-env = true`.
"""
if isinstance(op, ContainerOp) and op.pod_labels and 'add-pod-env' in op.pod_labels and op.pod_labels['add-pod-env'] == 'true':
from kubernetes import client as k8s_client
op.container.add_env_variable(
k8s_client.V1EnvVar(
name='KFP_POD_NAME',
value_from=k8s_client.V1EnvVarSource(
field_ref=k8s_client.V1ObjectFieldSelector(
field_path='metadata.name'
)
)
)
).add_env_variable(
k8s_client.V1EnvVar(
name='KFP_NAMESPACE',
value_from=k8s_client.V1EnvVarSource(
field_ref=k8s_client.V1ObjectFieldSelector(
field_path='metadata.namespace'
)
)
)
)
return op
if isinstance(
op, ContainerOp
) and op.pod_labels and 'add-pod-env' in op.pod_labels and op.pod_labels[
'add-pod-env'] == 'true':
return add_kfp_pod_env(op)
def add_kfp_pod_env(op: BaseOp) -> BaseOp:
"""Adds KFP pod environment info to the specified ContainerOp.
"""
if not isinstance(op, ContainerOp):
warnings.warn(
'Trying to add default KFP environment variables to an Op that is '
'not a ContainerOp. Ignoring request.')
return op
op.container.add_env_variable(
k8s_client.V1EnvVar(name='KFP_POD_NAME',
value_from=k8s_client.V1EnvVarSource(
field_ref=k8s_client.V1ObjectFieldSelector(
field_path='metadata.name')))
).add_env_variable(
k8s_client.V1EnvVar(name='KFP_NAMESPACE',
value_from=k8s_client.V1EnvVarSource(
field_ref=k8s_client.V1ObjectFieldSelector(
field_path='metadata.namespace')))
).add_env_variable(
k8s_client.V1EnvVar(
name='WORKFLOW_ID',
value_from=k8s_client.
V1EnvVarSource(field_ref=k8s_client.V1ObjectFieldSelector(
field_path="metadata.labels['workflows.argoproj.io/workflow']"))))
return op

View File

@ -25,7 +25,7 @@ from typing import Callable, Set, List, Text, Dict, Tuple, Any, Union, Optional
import kfp
from kfp.dsl import _for_loop
from kfp.compiler import _data_passing_rewriter
from kfp.compiler import _data_passing_rewriter, v2_compat
from .. import dsl
from ._k8s_helper import convert_k8s_obj_to_json, sanitize_k8s_name
@ -41,7 +41,7 @@ from ..dsl._pipeline_param import extract_pipelineparams_from_any, PipelineParam
class Compiler(object):
"""DSL Compiler that compiles pipeline functions into workflow yaml.
Example:
How to use the compiler to construct workflow yaml::
@ -55,6 +55,19 @@ class Compiler(object):
Compiler().compile(my_pipeline, 'path/to/workflow.yaml')
"""
def __init__(
self,
mode: dsl.PipelineExecutionMode = dsl.PipelineExecutionMode.V1_LEGACY):
if mode == dsl.PipelineExecutionMode.V2_ENGINE:
raise ValueError('V2_ENGINE execution mode is not supported yet.')
if mode == dsl.PipelineExecutionMode.V2_COMPATIBLE:
warnings.warn('V2_COMPATIBLE execution mode is still under development.'
' Pipelines may not work as expected.')
self._mode = mode
self._pipeline_name_param: Optional[dsl.PipelineParam] = None
self._pipeline_root_param: Optional[dsl.PipelineParam] = None
def _get_groups_for_ops(self, root_group):
"""Helper function to get belonging groups for each op.
@ -319,9 +332,10 @@ class Compiler(object):
# Generate the input for SubGraph along with parallelfor
for sub_graph in opsgroup_groups:
if sub_graph in op_name_to_for_loop_op:
# The opsgroup list is sorted with the farthest group as the first and the opsgroup
# itself as the last. To get the latest opsgroup which is not the opsgroup itself -2 is used.
parent = opsgroup_groups[sub_graph][-2]
# The opsgroup list is sorted with the farthest group as the first and
# the opsgroup itself as the last. To get the latest opsgroup which is
# not the opsgroup itself -2 is used.
parent = opsgroup_groups[sub_graph][-2]
if parent and parent.startswith('subgraph'):
# propagate only op's pipeline param from subgraph to parallelfor
loop_op = op_name_to_for_loop_op[sub_graph]
@ -428,7 +442,7 @@ class Compiler(object):
inputs, outputs, dependencies are all helper dicts.
"""
template = {'name': group.name}
if group.parallelism != None:
if group.parallelism != None:
template["parallelism"] = group.parallelism
# Generate inputs section.
@ -538,7 +552,7 @@ class Compiler(object):
# We will sort dependencies to have determinitc yaml and thus stable tests
if task.get('dependencies'):
task['dependencies'].sort()
task['dependencies'].sort()
tasks.append(task)
tasks.sort(key=lambda x: x['name'])
@ -647,11 +661,19 @@ class Compiler(object):
templates.append(template)
for op in pipeline.ops.values():
if self._mode == dsl.PipelineExecutionMode.V2_COMPATIBLE:
v2_compat.update_op(op,
pipeline_name=self._pipeline_name_param,
pipeline_root=self._pipeline_root_param)
templates.extend(op_to_templates_handler(op))
return templates
def _create_pipeline_workflow(self, parameter_defaults, pipeline, op_transformers=None, pipeline_conf=None):
def _create_pipeline_workflow(self,
parameter_defaults,
pipeline,
op_transformers=None,
pipeline_conf=None):
"""Create workflow for the pipeline."""
# Input Parameters
@ -727,7 +749,7 @@ class Compiler(object):
if exit_handler:
workflow['spec']['onExit'] = exit_handler.name
# This can be overwritten by the task specific
# This can be overwritten by the task specific
# nodeselection, specified in the template.
if pipeline_conf.default_pod_node_selector:
workflow['spec']['nodeSelector'] = pipeline_conf.default_pod_node_selector
@ -804,13 +826,14 @@ class Compiler(object):
sanitized_ops[sanitized_name] = op
pipeline.ops = sanitized_ops
def _create_workflow(self,
def _create_workflow(
self,
pipeline_func: Callable,
pipeline_name: Text=None,
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf = None,
) -> Dict[Text, Any]:
pipeline_name: Optional[Text] = None,
pipeline_description: Optional[Text] = None,
params_list: Optional[List[dsl.PipelineParam]] = None,
pipeline_conf: Optional[dsl.PipelineConf] = None,
) -> Dict[Text, Any]:
""" Internal implementation of create_workflow."""
params_list = params_list or []
@ -825,11 +848,10 @@ class Compiler(object):
# will be resolved immediately in place when being to each component.
default_param_values = OrderedDict()
if getattr(pipeline_func, 'output_directory', None):
dsl_pipeline_root = dsl.PipelineParam(
name=dsl.ROOT_PARAMETER_NAME, value=pipeline_func.output_directory)
pipeline_func.output_directory = dsl_pipeline_root
params_list.append(dsl_pipeline_root)
if self._pipeline_root_param:
params_list.append(self._pipeline_root_param)
if self._pipeline_name_param:
params_list.append(self._pipeline_name_param)
for param in params_list:
default_param_values[param.name] = param.value
@ -882,6 +904,11 @@ class Compiler(object):
op_transformers = [add_pod_env]
op_transformers.extend(pipeline_conf.op_transformers)
if self._mode == dsl.PipelineExecutionMode.V2_COMPATIBLE:
for op in dsl_pipeline.ops.values():
op.inputs.append(self._pipeline_name_param)
op.inputs.append(self._pipeline_root_param)
workflow = self._create_pipeline_workflow(
args_list_with_defaults,
dsl_pipeline,
@ -892,9 +919,8 @@ class Compiler(object):
from ._data_passing_rewriter import fix_big_data_passing
workflow = fix_big_data_passing(workflow)
output_directory = getattr(pipeline_func, 'output_directory', None)
workflow = _data_passing_rewriter.add_pod_name_passing(
workflow, str(output_directory))
workflow, str(self._pipeline_root_param or None))
if pipeline_conf and pipeline_conf.data_passing_method != None:
workflow = pipeline_conf.data_passing_method(workflow)
@ -906,6 +932,9 @@ class Compiler(object):
annotations['pipelines.kubeflow.org/pipeline_compilation_time'] = datetime.datetime.now().isoformat()
annotations['pipelines.kubeflow.org/pipeline_spec'] = json.dumps(pipeline_meta.to_dict(), sort_keys=True)
if self._mode == dsl.PipelineExecutionMode.V2_COMPATIBLE:
annotations['pipelines.kubeflow.org/v2_pipeline'] = "true"
# Labels might be logged better than annotations so adding some information here as well
labels = metadata.setdefault('labels', {})
labels['pipelines.kubeflow.org/kfp_sdk_version'] = kfp.__version__
@ -948,7 +977,11 @@ class Compiler(object):
"""Compile the given pipeline function into workflow."""
return self._create_workflow(pipeline_func=pipeline_func, pipeline_conf=pipeline_conf)
def compile(self, pipeline_func, package_path, type_check=True, pipeline_conf: dsl.PipelineConf = None):
def compile(self,
pipeline_func,
package_path,
type_check: bool = True,
pipeline_conf: Optional[dsl.PipelineConf] = None):
"""Compile the given pipeline function into workflow yaml.
Args:
@ -960,6 +993,17 @@ class Compiler(object):
pull secrets and other pipeline-level configuration options. Overrides
any configuration that may be set by the pipeline.
"""
pipeline_root_dir = getattr(pipeline_func, 'output_directory', None)
if (pipeline_root_dir is not None or
self._mode == dsl.PipelineExecutionMode.V2_COMPATIBLE):
self._pipeline_root_param = dsl.PipelineParam(
name=dsl.ROOT_PARAMETER_NAME, value=pipeline_root_dir or '')
if self._mode == dsl.PipelineExecutionMode.V2_COMPATIBLE:
pipeline_name = getattr(pipeline_func, '_component_human_name', '')
self._pipeline_name_param = dsl.PipelineParam(name='pipeline-name',
value=pipeline_name)
import kfp
type_check_old_value = kfp.TYPE_CHECK
try:
@ -988,10 +1032,10 @@ class Compiler(object):
from contextlib import closing
from io import BytesIO
with tarfile.open(package_path, "w:gz") as tar:
with closing(BytesIO(yaml_text.encode())) as yaml_file:
tarinfo = tarfile.TarInfo('pipeline.yaml')
tarinfo.size = len(yaml_file.getvalue())
tar.addfile(tarinfo, fileobj=yaml_file)
with closing(BytesIO(yaml_text.encode())) as yaml_file:
tarinfo = tarfile.TarInfo('pipeline.yaml')
tarinfo.size = len(yaml_file.getvalue())
tar.addfile(tarinfo, fileobj=yaml_file)
elif package_path.endswith('.zip'):
with zipfile.ZipFile(package_path, "w") as zip:
zipinfo = zipfile.ZipInfo('pipeline.yaml')
@ -1050,7 +1094,7 @@ Please create a new issue at https://github.com/kubeflow/pipelines/issues attach
has_working_argo_lint = _run_argo_lint('')
except:
warnings.warn("Cannot validate the compiled workflow. Found the argo program in PATH, but it's not usable. argo v2.4.3 should work.")
if has_working_argo_lint:
_run_argo_lint(yaml_text)
@ -1074,13 +1118,11 @@ def _run_argo_lint(yaml_text: str):
'yet. Otherwise, please create a new issue at '
'https://github.com/kubeflow/pipelines/issues attaching the '
'pipeline code and the pipeline package. Error: {}'.format(
result.stderr.decode('utf-8'))
)
result.stderr.decode('utf-8')))
raise RuntimeError(
'''Internal compiler error: Compiler has produced Argo-incompatible workflow.
'''Internal compiler error: Compiler has produced Argo-incompatible workflow.
Please create a new issue at https://github.com/kubeflow/pipelines/issues attaching the pipeline code and the pipeline package.
Error: {}'''.format(result.stderr.decode('utf-8'))
)
Error: {}'''.format(result.stderr.decode('utf-8')))
return True
return False

View File

@ -0,0 +1,265 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: my-test-pipeline-
annotations:
pipelines.kubeflow.org/kfp_sdk_version: 1.4.1
pipelines.kubeflow.org/pipeline_compilation_time: '2021-03-08T06:53:16.328199'
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "gs://output-directory/v2-artifacts",
"name": "pipeline-output-directory"}, {"default": "my-test-pipeline", "name":
"pipeline-name"}], "name": "my-test-pipeline"}'
pipelines.kubeflow.org/v2_pipeline: "true"
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.4.1}
spec:
entrypoint: my-test-pipeline
templates:
- name: my-test-pipeline
inputs:
parameters:
- {name: pipeline-name}
- {name: pipeline-output-directory}
dag:
tasks:
- name: preprocess
template: preprocess
arguments:
parameters:
- {name: pipeline-name, value: '{{inputs.parameters.pipeline-name}}'}
- {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
- name: train
template: train
dependencies: [preprocess]
arguments:
parameters:
- {name: pipeline-name, value: '{{inputs.parameters.pipeline-name}}'}
- {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
- {name: preprocess-output_parameter_one, value: '{{tasks.preprocess.outputs.parameters.preprocess-output_parameter_one}}'}
artifacts:
- {name: preprocess-output_dataset_one, from: '{{tasks.preprocess.outputs.artifacts.preprocess-output_dataset_one}}'}
- name: preprocess
container:
args:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def preprocess(uri, some_int, output_parameter_one,
output_dataset_one):
'''Dummy Preprocess Step.'''
with open(output_dataset_one, 'w') as f:
f.write('Output dataset')
with open(output_parameter_one, 'w') as f:
f.write("{}".format(1234))
import argparse
_parser = argparse.ArgumentParser(prog='Preprocess', description='Dummy Preprocess Step.')
_parser.add_argument("--uri", dest="uri", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--some-int", dest="some_int", type=int, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output-parameter-one", dest="output_parameter_one", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output-dataset-one", dest="output_dataset_one", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = preprocess(**_parsed_args)
- --uri
- '{{$.inputs.parameters[''uri'']}}'
- --some-int
- '{{$.inputs.parameters[''some_int'']}}'
- --output-parameter-one
- '{{$.outputs.parameters[''output_parameter_one''].output_file}}'
- --output-dataset-one
- '{{$.outputs.artifacts[''output_dataset_one''].path}}'
command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST),
--mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO),
--container_image, $(KFP_V2_IMAGE), --task_name, preprocess, --pipeline_name,
'{{inputs.parameters.pipeline-name}}', --pipeline_run_id, $(WORKFLOW_ID),
--pipeline_task_id, $(KFP_POD_NAME), --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}']
env:
- name: KFP_POD_NAME
valueFrom:
fieldRef: {fieldPath: metadata.name}
- name: KFP_NAMESPACE
valueFrom:
fieldRef: {fieldPath: metadata.namespace}
- name: WORKFLOW_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
- {name: KFP_V2_IMAGE, value: 'python:3.9'}
- {name: KFP_V2_RUNTIME_INFO, value: '{"inputParameters": {"some_int": {"parameterType":
"INT", "parameterValue": "12"}, "uri": {"parameterType": "STRING", "parameterValue":
"uri-to-import"}}, "inputArtifacts": {}, "outputParameters": {"output_parameter_one":
{"parameterType": "INT", "fileOutputPath": "/tmp/outputs/output_parameter_one/data"}},
"outputArtifacts": {"output_dataset_one": {"artifactSchema": "title: kfp.Dataset\ntype:
object\nproperties:\n payload_format:\n type: string\n container_format:\n type:
string", "fileOutputPath": "/tmp/outputs/output_dataset_one/data"}}}'}
envFrom:
- configMapRef: {name: metadata-grpc-configmap, optional: true}
image: python:3.9
volumeMounts:
- {mountPath: /kfp-launcher, name: kfp-launcher}
inputs:
parameters:
- {name: pipeline-name}
- {name: pipeline-output-directory}
outputs:
parameters:
- name: preprocess-output_parameter_one
valueFrom: {path: /tmp/outputs/output_parameter_one/data}
artifacts:
- {name: preprocess-output_dataset_one, path: /tmp/outputs/output_dataset_one/data}
- {name: preprocess-output_parameter_one, path: /tmp/outputs/output_parameter_one/data}
metadata:
annotations:
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/component_spec: '{"description": "Dummy Preprocess
Step.", "implementation": {"container": {"args": ["--uri", {"inputValue":
"uri"}, "--some-int", {"inputValue": "some_int"}, "--output-parameter-one",
{"outputPath": "output_parameter_one"}, "--output-dataset-one", {"outputPath":
"output_dataset_one"}], "command": ["sh", "-ec", "program_path=$(mktemp)\nprintf
\"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def _make_parent_dirs_and_return_path(file_path: str):\n import os\n os.makedirs(os.path.dirname(file_path),
exist_ok=True)\n return file_path\n\ndef preprocess(uri, some_int, output_parameter_one,\n output_dataset_one):\n ''''''Dummy
Preprocess Step.''''''\n with open(output_dataset_one, ''w'') as f:\n f.write(''Output
dataset'')\n with open(output_parameter_one, ''w'') as f:\n f.write(\"{}\".format(1234))\n\nimport
argparse\n_parser = argparse.ArgumentParser(prog=''Preprocess'', description=''Dummy
Preprocess Step.'')\n_parser.add_argument(\"--uri\", dest=\"uri\", type=str,
required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--some-int\",
dest=\"some_int\", type=int, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-parameter-one\",
dest=\"output_parameter_one\", type=_make_parent_dirs_and_return_path, required=True,
default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-dataset-one\",
dest=\"output_dataset_one\", type=_make_parent_dirs_and_return_path, required=True,
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs
= preprocess(**_parsed_args)\n"], "image": "python:3.9"}}, "inputs": [{"name":
"uri", "type": "String"}, {"name": "some_int", "type": "Integer"}], "name":
"Preprocess", "outputs": [{"name": "output_parameter_one", "type": "Integer"},
{"name": "output_dataset_one", "type": "Dataset"}]}'
pipelines.kubeflow.org/component_ref: '{}'
pipelines.kubeflow.org/arguments.parameters: '{"some_int": "12", "uri": "uri-to-import"}'
initContainers:
- command: [/bin/mount_launcher.sh]
image: gcr.io/ml-pipeline/kfp-launcher
name: kfp-launcher
mirrorVolumeMounts: true
volumes:
- {name: kfp-launcher}
- name: train
container:
args:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def train(dataset,
model,
num_steps = 100):
'''Dummy Training Step.'''
with open(dataset, 'r') as input_file:
input_string = input_file.read()
with open(model, 'w') as output_file:
for i in range(num_steps):
output_file.write("Step {}\n{}\n=====\n".format(i, input_string))
import argparse
_parser = argparse.ArgumentParser(prog='Train', description='Dummy Training Step.')
_parser.add_argument("--dataset", dest="dataset", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--num-steps", dest="num_steps", type=int, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--model", dest="model", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = train(**_parsed_args)
- --dataset
- '{{$.inputs.artifacts[''dataset''].path}}'
- --num-steps
- '{{$.inputs.parameters[''num_steps'']}}'
- --model
- '{{$.outputs.artifacts[''model''].path}}'
command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST),
--mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO),
--container_image, $(KFP_V2_IMAGE), --task_name, train, --pipeline_name, '{{inputs.parameters.pipeline-name}}',
--pipeline_run_id, $(WORKFLOW_ID), --pipeline_task_id, $(KFP_POD_NAME), --pipeline_root,
'{{inputs.parameters.pipeline-output-directory}}']
env:
- name: KFP_POD_NAME
valueFrom:
fieldRef: {fieldPath: metadata.name}
- name: KFP_NAMESPACE
valueFrom:
fieldRef: {fieldPath: metadata.namespace}
- name: WORKFLOW_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
- {name: KFP_V2_IMAGE, value: 'python:3.7'}
- {name: KFP_V2_RUNTIME_INFO, value: '{"inputParameters": {"num_steps": {"parameterType":
"INT", "parameterValue": "{{inputs.parameters.preprocess-output_parameter_one}}"}},
"inputArtifacts": {"dataset": {"fileInputPath": "/tmp/inputs/dataset/data"}},
"outputParameters": {}, "outputArtifacts": {"model": {"artifactSchema":
"title: kfp.Model\ntype: object\nproperties:\n framework:\n type: string\n framework_version:\n type:
string\n", "fileOutputPath": "/tmp/outputs/model/data"}}}'}
envFrom:
- configMapRef: {name: metadata-grpc-configmap, optional: true}
image: python:3.7
volumeMounts:
- {mountPath: /kfp-launcher, name: kfp-launcher}
inputs:
parameters:
- {name: pipeline-name}
- {name: pipeline-output-directory}
- {name: preprocess-output_parameter_one}
artifacts:
- {name: preprocess-output_dataset_one, path: /tmp/inputs/dataset/data}
outputs:
artifacts:
- {name: train-model, path: /tmp/outputs/model/data}
metadata:
annotations:
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/component_spec: '{"description": "Dummy Training Step.",
"implementation": {"container": {"args": ["--dataset", {"inputPath": "dataset"},
{"if": {"cond": {"isPresent": "num_steps"}, "then": ["--num-steps", {"inputValue":
"num_steps"}]}}, "--model", {"outputPath": "model"}], "command": ["sh",
"-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
-u \"$program_path\" \"$@\"\n", "def _make_parent_dirs_and_return_path(file_path:
str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return
file_path\n\ndef train(dataset,\n model,\n num_steps =
100):\n ''''''Dummy Training Step.''''''\n\n with open(dataset, ''r'')
as input_file:\n input_string = input_file.read()\n with open(model,
''w'') as output_file:\n for i in range(num_steps):\n output_file.write(\"Step
{}\\n{}\\n=====\\n\".format(i, input_string))\n\nimport argparse\n_parser
= argparse.ArgumentParser(prog=''Train'', description=''Dummy Training Step.'')\n_parser.add_argument(\"--dataset\",
dest=\"dataset\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--num-steps\",
dest=\"num_steps\", type=int, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--model\",
dest=\"model\", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)\n_parsed_args
= vars(_parser.parse_args())\n\n_outputs = train(**_parsed_args)\n"], "image":
"python:3.7"}}, "inputs": [{"name": "dataset", "type": "Dataset"}, {"default":
"100", "name": "num_steps", "optional": true, "type": "Integer"}], "name":
"Train", "outputs": [{"name": "model", "type": "Model"}]}'
pipelines.kubeflow.org/component_ref: '{}'
pipelines.kubeflow.org/arguments.parameters: '{"num_steps": "{{inputs.parameters.preprocess-output_parameter_one}}"}'
initContainers:
- command: [/bin/mount_launcher.sh]
image: gcr.io/ml-pipeline/kfp-launcher
name: kfp-launcher
mirrorVolumeMounts: true
volumes:
- {name: kfp-launcher}
arguments:
parameters:
- {name: pipeline-output-directory, value: 'gs://output-directory/v2-artifacts'}
- {name: pipeline-name, value: my-test-pipeline}
serviceAccountName: pipeline-runner

View File

@ -0,0 +1,129 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions for enabling v2-compatible pipelines in v1."""
import collections
import json
from kfp import dsl
from kfp.compiler import _default_transformers
from kfp.pipeline_spec import pipeline_spec_pb2
from kfp.v2 import compiler
from kubernetes import client as k8s_client
_LAUNCHER_CONTAINER = dsl.UserContainer(name="kfp-launcher",
image="gcr.io/ml-pipeline/kfp-launcher",
command="/bin/mount_launcher.sh",
mirror_volume_mounts=True)
def update_op(op: dsl.ContainerOp, pipeline_name: dsl.PipelineParam,
pipeline_root: dsl.PipelineParam) -> None:
"""Updates the passed in Op for running in v2-compatible mode.
Args:
op: The Op to update.
pipeline_spec: The PipelineSpec for the pipeline under which `op`
runs.
pipeline_root: The root output directory for pipeline artifacts.
"""
# Inject the launcher binary and overwrite the entrypoint.
op.add_init_container(_LAUNCHER_CONTAINER)
op.add_volume(k8s_client.V1Volume(name='kfp-launcher'))
op.add_volume_mount(
k8s_client.V1VolumeMount(name='kfp-launcher', mount_path='/kfp-launcher'))
op.command = [
"/kfp-launcher/launch",
"--mlmd_server_address",
"$(METADATA_GRPC_SERVICE_HOST)",
"--mlmd_server_port",
"$(METADATA_GRPC_SERVICE_PORT)",
"--runtime_info_json",
"$(KFP_V2_RUNTIME_INFO)",
"--container_image",
"$(KFP_V2_IMAGE)",
"--task_name",
op.name,
"--pipeline_name",
pipeline_name,
"--pipeline_run_id",
"$(WORKFLOW_ID)",
"--pipeline_task_id",
"$(KFP_POD_NAME)",
"--pipeline_root",
pipeline_root,
]
# Mount necessary environment variables.
op.apply(_default_transformers.add_kfp_pod_env)
op.container.add_env_variable(
k8s_client.V1EnvVar(name="KFP_V2_IMAGE", value=op.container.image))
config_map_ref = k8s_client.V1ConfigMapEnvSource(
name='metadata-grpc-configmap', optional=True)
op.container.add_env_from(
k8s_client.V1EnvFromSource(config_map_ref=config_map_ref))
op.arguments = list(op.container_spec.command) + list(op.container_spec.args)
runtime_info = {
"inputParameters": collections.OrderedDict(),
"inputArtifacts": collections.OrderedDict(),
"outputParameters": collections.OrderedDict(),
"outputArtifacts": collections.OrderedDict(),
}
component_spec = op.component_spec
for parameter, spec in sorted(
component_spec.input_definitions.parameters.items()):
parameter_info = {
"parameterType":
pipeline_spec_pb2.PrimitiveType.PrimitiveTypeEnum.Name(spec.type),
"parameterValue":
op._parameter_arguments[parameter],
}
runtime_info["inputParameters"][parameter] = parameter_info
for artifact_name, spec in sorted(
component_spec.input_definitions.artifacts.items()):
artifact_info = {"fileInputPath": op.input_artifact_paths[artifact_name]}
runtime_info["inputArtifacts"][artifact_name] = artifact_info
for parameter, spec in sorted(
component_spec.output_definitions.parameters.items()):
parameter_info = {
"parameterType":
pipeline_spec_pb2.PrimitiveType.PrimitiveTypeEnum.Name(spec.type),
"fileOutputPath":
op.file_outputs[parameter],
}
runtime_info["outputParameters"][parameter] = parameter_info
for artifact_name, spec in sorted(
component_spec.output_definitions.artifacts.items()):
# TODO: Assert instance_schema.
artifact_info = {
# Type used to register output artifacts.
"artifactSchema": spec.artifact_type.instance_schema,
# File used to write out the registered artifact ID.
"fileOutputPath": op.file_outputs[artifact_name],
}
runtime_info["outputArtifacts"][artifact_name] = artifact_info
op.container.add_env_variable(
k8s_client.V1EnvVar(name="KFP_V2_RUNTIME_INFO",
value=json.dumps(runtime_info)))
op.pod_annotations['pipelines.kubeflow.org/v2_component'] = "true"

View File

@ -0,0 +1,98 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for v2-compatible compiled pipelines."""
import os
import tempfile
from typing import Callable
import unittest
import yaml
from kfp import compiler, components, dsl
from kfp.components import InputPath, OutputPath
def preprocess(uri: str, some_int: int, output_parameter_one: OutputPath(int),
output_dataset_one: OutputPath('Dataset')):
'''Dummy Preprocess Step.'''
with open(output_dataset_one, 'w') as f:
f.write('Output dataset')
with open(output_parameter_one, 'w') as f:
f.write("{}".format(1234))
def train(dataset: InputPath('Dataset'),
model: OutputPath('Model'),
num_steps: int = 100):
'''Dummy Training Step.'''
with open(dataset, 'r') as input_file:
input_string = input_file.read()
with open(model, 'w') as output_file:
for i in range(num_steps):
output_file.write("Step {}\n{}\n=====\n".format(i, input_string))
preprocess_op = components.create_component_from_func(preprocess,
base_image='python:3.9')
train_op = components.create_component_from_func(train)
class TestV2CompatibleModeCompiler(unittest.TestCase):
def setUp(self) -> None:
self._compiler = compiler.Compiler(
mode=dsl.PipelineExecutionMode.V2_COMPATIBLE)
def _assert_compiled_pipeline_equals_golden(self, pipeline_func: Callable,
golden_yaml_filename: str):
compiled_file = os.path.join(tempfile.mkdtemp(), 'workflow.yaml')
self._compiler.compile(pipeline_func, package_path=compiled_file)
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_file = os.path.join(test_data_dir, golden_yaml_filename)
# Uncomment the following to update goldens.
# TODO: place this behind some --update_goldens flag.
# self._compiler.compile(pipeline_func, package_path=golden_file)
with open(golden_file, 'r') as f:
golden = yaml.safe_load(f)
with open(compiled_file, 'r') as f:
compiled = yaml.safe_load(f)
for workflow in golden, compiled:
del workflow['metadata']
for template in workflow['spec']['templates']:
template.pop('metadata', None)
self.maxDiff = None
self.assertDictEqual(golden, compiled)
def test_two_step_pipeline(self):
@dsl.pipeline(pipeline_root='gs://output-directory/v2-artifacts',
name='my-test-pipeline')
def v2_compatible_two_step_pipeline():
preprocess_task = preprocess_op(uri='uri-to-import', some_int=12)
train_task = train_op(
num_steps=preprocess_task.outputs['output_parameter_one'],
dataset=preprocess_task.outputs['output_dataset_one'])
self._assert_compiled_pipeline_equals_golden(
v2_compatible_two_step_pipeline, 'v2_compatible_two_step_pipeline.yaml')
if __name__ == '__main__':
unittest.main()

View File

@ -12,14 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._pipeline_param import PipelineParam, match_serialized_pipelineparam
from ._pipeline import Pipeline, pipeline, get_pipeline_conf, PipelineConf
from ._pipeline import Pipeline, PipelineExecutionMode, pipeline, get_pipeline_conf, PipelineConf
from ._container_op import BaseOp, ContainerOp, InputArgumentPath, UserContainer, Sidecar
from ._resource_op import ResourceOp
from ._volume_op import (
VolumeOp, VOLUME_MODE_RWO, VOLUME_MODE_RWM, VOLUME_MODE_ROM
)
from ._volume_op import VolumeOp, VOLUME_MODE_RWO, VOLUME_MODE_RWM, VOLUME_MODE_ROM
from ._pipeline_volume import PipelineVolume
from ._volume_snapshot_op import VolumeSnapshotOp
from ._ops_group import OpsGroup, ExitHandler, Condition, ParallelFor, SubGraph

View File

@ -199,8 +199,10 @@ class Container(V1Container):
# v2 container_spec
self._container_spec = None
super(Container, self).__init__(
image=image, command=command, args=args, **kwargs)
super(Container, self).__init__(image=image,
command=command,
args=args,
**kwargs)
def _validate_size_string(self, size_string):
"""Validate a given string is valid for memory/ephemeral-storage request or limit."""
@ -642,13 +644,13 @@ class UserContainer(Container):
# NOTE inherits definition from `V1Container` rather than `Container`
# because `Container` has no `name` property.
if hasattr(V1Container, 'swagger_types'):
swagger_types = dict(
**V1Container.swagger_types, mirror_volume_mounts='bool')
swagger_types = dict(**V1Container.swagger_types,
mirror_volume_mounts='bool')
if hasattr(V1Container, 'openapi_types'):
openapi_types = dict(
**V1Container.openapi_types, mirror_volume_mounts='bool')
attribute_map = dict(
**V1Container.attribute_map, mirror_volume_mounts='mirrorVolumeMounts')
openapi_types = dict(**V1Container.openapi_types,
mirror_volume_mounts='bool')
attribute_map = dict(**V1Container.attribute_map,
mirror_volume_mounts='mirrorVolumeMounts')
def __init__(self,
name: str,
@ -657,12 +659,11 @@ class UserContainer(Container):
args: StringOrStringList = None,
mirror_volume_mounts: bool = None,
**kwargs):
super().__init__(
name=name,
image=image,
command=as_string_list(command),
args=as_string_list(args),
**kwargs)
super().__init__(name=name,
image=image,
command=as_string_list(command),
args=as_string_list(args),
**kwargs)
self.mirror_volume_mounts = mirror_volume_mounts
@ -710,13 +711,12 @@ class Sidecar(UserContainer):
args: StringOrStringList = None,
mirror_volume_mounts: bool = None,
**kwargs):
super().__init__(
name=name,
image=image,
command=command,
args=args,
mirror_volume_mounts=mirror_volume_mounts,
**kwargs)
super().__init__(name=name,
image=image,
command=command,
args=args,
mirror_volume_mounts=mirror_volume_mounts,
**kwargs)
def _make_hash_based_id_for_op(op):
@ -1096,11 +1096,10 @@ class ContainerOp(BaseOp):
is_exit_handler: bool = False,
pvolumes: Optional[Dict[str, V1Volume]] = None,
):
super().__init__(
name=name,
init_containers=init_containers,
sidecars=sidecars,
is_exit_handler=is_exit_handler)
super().__init__(name=name,
init_containers=init_containers,
sidecars=sidecars,
is_exit_handler=is_exit_handler)
if (not ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING) and (
'--component_launcher_class_path' not in (arguments or [])):
@ -1150,8 +1149,10 @@ class ContainerOp(BaseOp):
# `container` prop in `io.argoproj.workflow.v1alpha1.Template`
container_kwargs = container_kwargs or {}
self._container = Container(
image=image, args=arguments, command=command, **container_kwargs)
self._container = Container(image=image,
args=arguments,
command=command,
**container_kwargs)
# NOTE for backward compatibility (remove in future?)
# proxy old ContainerOp callables to Container
@ -1185,8 +1186,7 @@ class ContainerOp(BaseOp):
warnings.warn(
'The output_artifact_paths parameter is deprecated since SDK v0.1.32. '
'Use the file_outputs parameter instead. file_outputs now supports '
'outputting big data.',
DeprecationWarning)
'outputting big data.', DeprecationWarning)
# Special handling for the mlpipeline-ui-metadata and mlpipeline-metrics
# outputs that should always be saved as artifacts
@ -1366,8 +1366,7 @@ class _MultipleOutputsError:
def raise_error():
raise RuntimeError(
'This task has multiple outputs. Use `task.outputs[<output name>]` '
'dictionary to refer to the one you need.'
)
'dictionary to refer to the one you need.')
def __getattribute__(self, name):
_MultipleOutputsError.raise_error()

View File

@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
from typing import Callable, Optional, Union
from kubernetes.client.models import V1PodDNSConfig
from kfp.dsl import _container_op
from kfp.dsl import _resource_op
@ -27,18 +29,29 @@ import sys
# pipeline definition.
_pipeline_decorator_handler = None
class PipelineExecutionMode(enum.Enum):
# Compile to Argo YAML without support for metadata-enabled components.
V1_LEGACY = 1
# Compiles to Argo YAML with support for metadata-enabled components.
# Pipelines compiled using this mode aim to be compatible with v2 semantics.
V2_COMPATIBLE = 2
# Compiles to KFP v2 IR for execution using the v2 engine.
# This option is unsupported right now.
V2_ENGINE = 3
def pipeline(name: Optional[str] = None,
description: Optional[str] = None,
pipeline_root: Optional[str] = None):
def pipeline(
name: Optional[str] = None,
description: Optional[str] = None,
pipeline_root: Optional[str] = None):
"""Decorator of pipeline functions.
Example
::
@pipeline(
name='my awesome pipeline',
description='Is it really awesome?'
name='my-pipeline',
description='My ML Pipeline.'
pipeline_root='gs://my-bucket/my-output-path'
)
def my_pipeline(a: PipelineParam, b: PipelineParam):
@ -168,7 +181,6 @@ class PipelineConf():
def add_op_transformer(self, transformer):
"""Configures the op_transformers which will be applied to all ops in the pipeline.
The ops can be ResourceOp, VolumeOp, or ContainerOp.
Args:
@ -225,7 +237,6 @@ class PipelineConf():
def get_pipeline_conf():
"""Configure the pipeline level setting to the current pipeline
Note: call the function inside the user defined pipeline function.
"""
return Pipeline.get_default_pipeline().conf

View File

@ -25,6 +25,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: WORKFLOW_ID
valueFrom:
fieldRef:
fieldPath: metadata.labels['workflows.argoproj.io/workflow']
image: library/bash
metadata:
labels: