chore(sdk): Allow the launcher image to be configurable. (#5360)

* Allow the launcher image to be configurable. This is especially useful
for tests.

* Address PR comments.
This commit is contained in:
Ajay Gopinathan 2021-03-23 08:27:44 -07:00 committed by GitHub
parent 9a03151bbd
commit a1d453af21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 333 additions and 28 deletions

View File

@ -690,7 +690,8 @@ class Client(object):
experiment_name: Optional[str] = None,
pipeline_conf: Optional[dsl.PipelineConf] = None,
namespace: Optional[str] = None,
mode: dsl.PipelineExecutionMode = dsl.PipelineExecutionMode.V1_LEGACY):
mode: dsl.PipelineExecutionMode = dsl.PipelineExecutionMode.V1_LEGACY,
launcher_image: Optional[str] = None):
"""Runs pipeline on KFP-enabled Kubernetes cluster.
This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution.
@ -707,13 +708,16 @@ class Client(object):
For multi user, input a namespace where the user is authorized
mode: The PipelineExecutionMode to use when compiling and running
pipeline_func.
launcher_image: The launcher image to use if the mode is specified as
PipelineExecutionMode.V2_COMPATIBLE. Should only be needed for tests
or custom deployments right now.
"""
#TODO: Check arguments against the pipeline function
pipeline_name = pipeline_func.__name__
run_name = run_name or pipeline_name + ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_package_path = os.path.join(tmpdir, 'pipeline.yaml')
compiler.Compiler(mode=mode).compile(
compiler.Compiler(mode=mode, launcher_image=launcher_image).compile(
pipeline_func=pipeline_func,
package_path=pipeline_package_path,
pipeline_conf=pipeline_conf)

View File

@ -57,7 +57,16 @@ class Compiler(object):
def __init__(
self,
mode: dsl.PipelineExecutionMode = dsl.PipelineExecutionMode.V1_LEGACY):
mode: dsl.PipelineExecutionMode = dsl.PipelineExecutionMode.V1_LEGACY,
launcher_image: Optional[str] = None):
"""Creates a KFP compiler for compiling pipeline functions for execution.
Args:
mode: The pipeline execution mode to use.
launcher_image: Configurable image for KFP launcher to use. Only applies
when `mode == dsl.PipelineExecutionMode.V2_COMPATIBLE`. Should only be
needed for tests or custom deployments right now.
"""
if mode == dsl.PipelineExecutionMode.V2_ENGINE:
raise ValueError('V2_ENGINE execution mode is not supported yet.')
@ -65,6 +74,7 @@ class Compiler(object):
warnings.warn('V2_COMPATIBLE execution mode is still under development.'
' Pipelines may not work as expected.')
self._mode = mode
self._launcher_image = launcher_image
self._pipeline_name_param: Optional[dsl.PipelineParam] = None
self._pipeline_root_param: Optional[dsl.PipelineParam] = None
@ -664,7 +674,8 @@ class Compiler(object):
if self._mode == dsl.PipelineExecutionMode.V2_COMPATIBLE:
v2_compat.update_op(op,
pipeline_name=self._pipeline_name_param,
pipeline_root=self._pipeline_root_param)
pipeline_root=self._pipeline_root_param,
launcher_image=self._launcher_image)
templates.extend(op_to_templates_handler(op))
return templates
@ -1092,15 +1103,15 @@ Please create a new issue at https://github.com/kubeflow/pipelines/issues attach
has_working_argo_lint = False
try:
has_working_argo_lint = _run_argo_lint("""
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: docker/whalesay:latest""")
except:
warnings.warn("Cannot validate the compiled workflow. Found the argo program in PATH, but it's not usable. argo v2.4.3 should work.")

View File

@ -0,0 +1,265 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: my-test-pipeline-with-custom-launcher-
annotations:
pipelines.kubeflow.org/kfp_sdk_version: 1.5.0-rc.0
pipelines.kubeflow.org/pipeline_compilation_time: '2021-03-22T18:04:55.337209'
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "gs://output-directory/v2-artifacts",
"name": "pipeline-output-directory"}, {"default": "my-test-pipeline-with-custom-launcher",
"name": "pipeline-name"}], "name": "my-test-pipeline-with-custom-launcher"}'
pipelines.kubeflow.org/v2_pipeline: "true"
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.5.0-rc.0}
spec:
entrypoint: my-test-pipeline-with-custom-launcher
templates:
- name: my-test-pipeline-with-custom-launcher
inputs:
parameters:
- {name: pipeline-name}
- {name: pipeline-output-directory}
dag:
tasks:
- name: preprocess
template: preprocess
arguments:
parameters:
- {name: pipeline-name, value: '{{inputs.parameters.pipeline-name}}'}
- {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
- name: train
template: train
dependencies: [preprocess]
arguments:
parameters:
- {name: pipeline-name, value: '{{inputs.parameters.pipeline-name}}'}
- {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
- {name: preprocess-output_parameter_one, value: '{{tasks.preprocess.outputs.parameters.preprocess-output_parameter_one}}'}
artifacts:
- {name: preprocess-output_dataset_one, from: '{{tasks.preprocess.outputs.artifacts.preprocess-output_dataset_one}}'}
- name: preprocess
container:
args:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def preprocess(uri, some_int, output_parameter_one,
output_dataset_one):
'''Dummy Preprocess Step.'''
with open(output_dataset_one, 'w') as f:
f.write('Output dataset')
with open(output_parameter_one, 'w') as f:
f.write("{}".format(1234))
import argparse
_parser = argparse.ArgumentParser(prog='Preprocess', description='Dummy Preprocess Step.')
_parser.add_argument("--uri", dest="uri", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--some-int", dest="some_int", type=int, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output-parameter-one", dest="output_parameter_one", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output-dataset-one", dest="output_dataset_one", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = preprocess(**_parsed_args)
- --uri
- '{{$.inputs.parameters[''uri'']}}'
- --some-int
- '{{$.inputs.parameters[''some_int'']}}'
- --output-parameter-one
- '{{$.outputs.parameters[''output_parameter_one''].output_file}}'
- --output-dataset-one
- '{{$.outputs.artifacts[''output_dataset_one''].path}}'
command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST),
--mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO),
--container_image, $(KFP_V2_IMAGE), --task_name, preprocess, --pipeline_name,
'{{inputs.parameters.pipeline-name}}', --pipeline_run_id, $(WORKFLOW_ID),
--pipeline_task_id, $(KFP_POD_NAME), --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}']
env:
- name: KFP_POD_NAME
valueFrom:
fieldRef: {fieldPath: metadata.name}
- name: KFP_NAMESPACE
valueFrom:
fieldRef: {fieldPath: metadata.namespace}
- name: WORKFLOW_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
- {name: KFP_V2_IMAGE, value: 'python:3.9'}
- {name: KFP_V2_RUNTIME_INFO, value: '{"inputParameters": {"some_int": {"parameterType":
"INT", "parameterValue": "12"}, "uri": {"parameterType": "STRING", "parameterValue":
"uri-to-import"}}, "inputArtifacts": {}, "outputParameters": {"output_parameter_one":
{"parameterType": "INT", "fileOutputPath": "/tmp/outputs/output_parameter_one/data"}},
"outputArtifacts": {"output_dataset_one": {"artifactSchema": "title: kfp.Dataset\ntype:
object\nproperties:\n payload_format:\n type: string\n container_format:\n type:
string\n", "fileOutputPath": "/tmp/outputs/output_dataset_one/data"}}}'}
envFrom:
- configMapRef: {name: metadata-grpc-configmap, optional: true}
image: python:3.9
volumeMounts:
- {mountPath: /kfp-launcher, name: kfp-launcher}
inputs:
parameters:
- {name: pipeline-name}
- {name: pipeline-output-directory}
outputs:
parameters:
- name: preprocess-output_parameter_one
valueFrom: {path: /tmp/outputs/output_parameter_one/data}
artifacts:
- {name: preprocess-output_dataset_one, path: /tmp/outputs/output_dataset_one/data}
- {name: preprocess-output_parameter_one, path: /tmp/outputs/output_parameter_one/data}
metadata:
annotations:
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/component_spec: '{"description": "Dummy Preprocess
Step.", "implementation": {"container": {"args": ["--uri", {"inputValue":
"uri"}, "--some-int", {"inputValue": "some_int"}, "--output-parameter-one",
{"outputPath": "output_parameter_one"}, "--output-dataset-one", {"outputPath":
"output_dataset_one"}], "command": ["sh", "-ec", "program_path=$(mktemp)\nprintf
\"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def _make_parent_dirs_and_return_path(file_path: str):\n import os\n os.makedirs(os.path.dirname(file_path),
exist_ok=True)\n return file_path\n\ndef preprocess(uri, some_int, output_parameter_one,\n output_dataset_one):\n ''''''Dummy
Preprocess Step.''''''\n with open(output_dataset_one, ''w'') as f:\n f.write(''Output
dataset'')\n with open(output_parameter_one, ''w'') as f:\n f.write(\"{}\".format(1234))\n\nimport
argparse\n_parser = argparse.ArgumentParser(prog=''Preprocess'', description=''Dummy
Preprocess Step.'')\n_parser.add_argument(\"--uri\", dest=\"uri\", type=str,
required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--some-int\",
dest=\"some_int\", type=int, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-parameter-one\",
dest=\"output_parameter_one\", type=_make_parent_dirs_and_return_path, required=True,
default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-dataset-one\",
dest=\"output_dataset_one\", type=_make_parent_dirs_and_return_path, required=True,
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs
= preprocess(**_parsed_args)\n"], "image": "python:3.9"}}, "inputs": [{"name":
"uri", "type": "String"}, {"name": "some_int", "type": "Integer"}], "name":
"Preprocess", "outputs": [{"name": "output_parameter_one", "type": "Integer"},
{"name": "output_dataset_one", "type": "Dataset"}]}'
pipelines.kubeflow.org/component_ref: '{}'
pipelines.kubeflow.org/arguments.parameters: '{"some_int": "12", "uri": "uri-to-import"}'
initContainers:
- command: [/bin/mount_launcher.sh]
image: my-custom-image
name: kfp-launcher
mirrorVolumeMounts: true
volumes:
- {name: kfp-launcher}
- name: train
container:
args:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def train(dataset,
model,
num_steps = 100):
'''Dummy Training Step.'''
with open(dataset, 'r') as input_file:
input_string = input_file.read()
with open(model, 'w') as output_file:
for i in range(num_steps):
output_file.write("Step {}\n{}\n=====\n".format(i, input_string))
import argparse
_parser = argparse.ArgumentParser(prog='Train', description='Dummy Training Step.')
_parser.add_argument("--dataset", dest="dataset", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--num-steps", dest="num_steps", type=int, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--model", dest="model", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = train(**_parsed_args)
- --dataset
- '{{$.inputs.artifacts[''dataset''].path}}'
- --num-steps
- '{{$.inputs.parameters[''num_steps'']}}'
- --model
- '{{$.outputs.artifacts[''model''].path}}'
command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST),
--mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO),
--container_image, $(KFP_V2_IMAGE), --task_name, train, --pipeline_name, '{{inputs.parameters.pipeline-name}}',
--pipeline_run_id, $(WORKFLOW_ID), --pipeline_task_id, $(KFP_POD_NAME), --pipeline_root,
'{{inputs.parameters.pipeline-output-directory}}']
env:
- name: KFP_POD_NAME
valueFrom:
fieldRef: {fieldPath: metadata.name}
- name: KFP_NAMESPACE
valueFrom:
fieldRef: {fieldPath: metadata.namespace}
- name: WORKFLOW_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
- {name: KFP_V2_IMAGE, value: 'python:3.7'}
- {name: KFP_V2_RUNTIME_INFO, value: '{"inputParameters": {"num_steps": {"parameterType":
"INT", "parameterValue": "{{inputs.parameters.preprocess-output_parameter_one}}"}},
"inputArtifacts": {"dataset": {"fileInputPath": "/tmp/inputs/dataset/data"}},
"outputParameters": {}, "outputArtifacts": {"model": {"artifactSchema":
"title: kfp.Model\ntype: object\nproperties:\n framework:\n type: string\n framework_version:\n type:
string\n", "fileOutputPath": "/tmp/outputs/model/data"}}}'}
envFrom:
- configMapRef: {name: metadata-grpc-configmap, optional: true}
image: python:3.7
volumeMounts:
- {mountPath: /kfp-launcher, name: kfp-launcher}
inputs:
parameters:
- {name: pipeline-name}
- {name: pipeline-output-directory}
- {name: preprocess-output_parameter_one}
artifacts:
- {name: preprocess-output_dataset_one, path: /tmp/inputs/dataset/data}
outputs:
artifacts:
- {name: train-model, path: /tmp/outputs/model/data}
metadata:
annotations:
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/component_spec: '{"description": "Dummy Training Step.",
"implementation": {"container": {"args": ["--dataset", {"inputPath": "dataset"},
{"if": {"cond": {"isPresent": "num_steps"}, "then": ["--num-steps", {"inputValue":
"num_steps"}]}}, "--model", {"outputPath": "model"}], "command": ["sh",
"-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
-u \"$program_path\" \"$@\"\n", "def _make_parent_dirs_and_return_path(file_path:
str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return
file_path\n\ndef train(dataset,\n model,\n num_steps =
100):\n ''''''Dummy Training Step.''''''\n\n with open(dataset, ''r'')
as input_file:\n input_string = input_file.read()\n with open(model,
''w'') as output_file:\n for i in range(num_steps):\n output_file.write(\"Step
{}\\n{}\\n=====\\n\".format(i, input_string))\n\nimport argparse\n_parser
= argparse.ArgumentParser(prog=''Train'', description=''Dummy Training Step.'')\n_parser.add_argument(\"--dataset\",
dest=\"dataset\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--num-steps\",
dest=\"num_steps\", type=int, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--model\",
dest=\"model\", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)\n_parsed_args
= vars(_parser.parse_args())\n\n_outputs = train(**_parsed_args)\n"], "image":
"python:3.7"}}, "inputs": [{"name": "dataset", "type": "Dataset"}, {"default":
"100", "name": "num_steps", "optional": true, "type": "Integer"}], "name":
"Train", "outputs": [{"name": "model", "type": "Model"}]}'
pipelines.kubeflow.org/component_ref: '{}'
pipelines.kubeflow.org/arguments.parameters: '{"num_steps": "{{inputs.parameters.preprocess-output_parameter_one}}"}'
initContainers:
- command: [/bin/mount_launcher.sh]
image: my-custom-image
name: kfp-launcher
mirrorVolumeMounts: true
volumes:
- {name: kfp-launcher}
arguments:
parameters:
- {name: pipeline-output-directory, value: 'gs://output-directory/v2-artifacts'}
- {name: pipeline-name, value: my-test-pipeline-with-custom-launcher}
serviceAccountName: pipeline-runner

View File

@ -14,6 +14,7 @@
"""Utility functions for enabling v2-compatible pipelines in v1."""
import collections
import json
from typing import Optional
from kfp import dsl
from kfp.compiler import _default_transformers
@ -22,14 +23,13 @@ from kfp.v2 import compiler
from kubernetes import client as k8s_client
_LAUNCHER_CONTAINER = dsl.UserContainer(name="kfp-launcher",
image="gcr.io/ml-pipeline/kfp-launcher",
command="/bin/mount_launcher.sh",
mirror_volume_mounts=True)
_DEFAULT_LAUNCHER_IMAGE = "gcr.io/ml-pipeline/kfp-launcher"
def update_op(op: dsl.ContainerOp, pipeline_name: dsl.PipelineParam,
pipeline_root: dsl.PipelineParam) -> None:
def update_op(op: dsl.ContainerOp,
pipeline_name: dsl.PipelineParam,
pipeline_root: dsl.PipelineParam,
launcher_image: Optional[str] = None) -> None:
"""Updates the passed in Op for running in v2-compatible mode.
Args:
@ -37,9 +37,16 @@ def update_op(op: dsl.ContainerOp, pipeline_name: dsl.PipelineParam,
pipeline_spec: The PipelineSpec for the pipeline under which `op`
runs.
pipeline_root: The root output directory for pipeline artifacts.
launcher_image: An optional launcher image. Useful for tests.
"""
# Inject the launcher binary and overwrite the entrypoint.
op.add_init_container(_LAUNCHER_CONTAINER)
image_name = launcher_image or _DEFAULT_LAUNCHER_IMAGE
launcher_container = dsl.UserContainer(name="kfp-launcher",
image=image_name,
command="/bin/mount_launcher.sh",
mirror_volume_mounts=True)
op.add_init_container(launcher_container)
op.add_volume(k8s_client.V1Volume(name='kfp-launcher'))
op.add_volume_mount(
k8s_client.V1VolumeMount(name='kfp-launcher', mount_path='/kfp-launcher'))

View File

@ -51,20 +51,18 @@ train_op = components.create_component_from_func(train)
class TestV2CompatibleModeCompiler(unittest.TestCase):
def setUp(self) -> None:
self._compiler = compiler.Compiler(
mode=dsl.PipelineExecutionMode.V2_COMPATIBLE)
def _assert_compiled_pipeline_equals_golden(self, pipeline_func: Callable,
def _assert_compiled_pipeline_equals_golden(self,
kfp_compiler: compiler.Compiler,
pipeline_func: Callable,
golden_yaml_filename: str):
compiled_file = os.path.join(tempfile.mkdtemp(), 'workflow.yaml')
self._compiler.compile(pipeline_func, package_path=compiled_file)
kfp_compiler.compile(pipeline_func, package_path=compiled_file)
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_file = os.path.join(test_data_dir, golden_yaml_filename)
# Uncomment the following to update goldens.
# TODO: place this behind some --update_goldens flag.
# self._compiler.compile(pipeline_func, package_path=golden_file)
# kfp_compiler.compile(pipeline_func, package_path=golden_file)
with open(golden_file, 'r') as f:
golden = yaml.safe_load(f)
@ -90,8 +88,28 @@ class TestV2CompatibleModeCompiler(unittest.TestCase):
num_steps=preprocess_task.outputs['output_parameter_one'],
dataset=preprocess_task.outputs['output_dataset_one'])
kfp_compiler = compiler.Compiler(
mode=dsl.PipelineExecutionMode.V2_COMPATIBLE)
self._assert_compiled_pipeline_equals_golden(
v2_compatible_two_step_pipeline, 'v2_compatible_two_step_pipeline.yaml')
kfp_compiler, v2_compatible_two_step_pipeline,
'v2_compatible_two_step_pipeline.yaml')
def test_custom_launcher(self):
@dsl.pipeline(pipeline_root='gs://output-directory/v2-artifacts',
name='my-test-pipeline-with-custom-launcher')
def v2_compatible_two_step_pipeline():
preprocess_task = preprocess_op(uri='uri-to-import', some_int=12)
train_task = train_op(
num_steps=preprocess_task.outputs['output_parameter_one'],
dataset=preprocess_task.outputs['output_dataset_one'])
kfp_compiler = compiler.Compiler(
mode=dsl.PipelineExecutionMode.V2_COMPATIBLE,
launcher_image='my-custom-image')
self._assert_compiled_pipeline_equals_golden(
kfp_compiler, v2_compatible_two_step_pipeline,
'v2_compatible_two_step_pipeline_with_custom_launcher.yaml')
if __name__ == '__main__':