193 lines
6.8 KiB
Python
193 lines
6.8 KiB
Python
# Copyright 2021 The Kubeflow Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Utility functions for enabling v2-compatible pipelines in v1."""
|
|
import collections
|
|
import json
|
|
from typing import Optional
|
|
|
|
from kfp import dsl
|
|
from kfp.compiler import _default_transformers
|
|
from kfp.pipeline_spec import pipeline_spec_pb2
|
|
from kfp.v2 import compiler
|
|
from kubernetes import client as k8s_client
|
|
|
|
_DEFAULT_LAUNCHER_IMAGE = "gcr.io/ml-pipeline/kfp-launcher:1.8.2"
|
|
|
|
|
|
def update_op(op: dsl.ContainerOp,
|
|
pipeline_name: dsl.PipelineParam,
|
|
pipeline_root: dsl.PipelineParam,
|
|
launcher_image: Optional[str] = None) -> None:
|
|
"""Updates the passed in Op for running in v2-compatible mode.
|
|
|
|
Args:
|
|
op: The Op to update.
|
|
pipeline_spec: The PipelineSpec for the pipeline under which `op`
|
|
runs.
|
|
pipeline_root: The root output directory for pipeline artifacts.
|
|
launcher_image: An optional launcher image. Useful for tests.
|
|
"""
|
|
op.is_v2 = True
|
|
# Inject the launcher binary and overwrite the entrypoint.
|
|
image_name = launcher_image or _DEFAULT_LAUNCHER_IMAGE
|
|
launcher_container = dsl.UserContainer(
|
|
name="kfp-launcher",
|
|
image=image_name,
|
|
command=["launcher", "--copy", "/kfp-launcher/launch"],
|
|
mirror_volume_mounts=True)
|
|
|
|
op.add_init_container(launcher_container)
|
|
op.add_volume(k8s_client.V1Volume(name='kfp-launcher'))
|
|
op.add_volume_mount(
|
|
k8s_client.V1VolumeMount(
|
|
name='kfp-launcher', mount_path='/kfp-launcher'))
|
|
|
|
# op.command + op.args will have the following sections:
|
|
# 1. args passed to kfp-launcher
|
|
# 2. a separator "--"
|
|
# 3. parameters in format "key1=value1", "key2=value2", ...
|
|
# 4. a separator "--" as end of arguments passed to launcher
|
|
# 5. (start of op.args) arguments of the original user program command + args
|
|
#
|
|
# example:
|
|
# - command:
|
|
# - /kfp-launcher/launch
|
|
# - '--mlmd_server_address'
|
|
# - $(METADATA_GRPC_SERVICE_HOST)
|
|
# - '--mlmd_server_port'
|
|
# - $(METADATA_GRPC_SERVICE_PORT)
|
|
# - ... # more launcher params
|
|
# - '--pipeline_task_id'
|
|
# - $(KFP_POD_NAME)
|
|
# - '--pipeline_root'
|
|
# - ''
|
|
# - '--' # start of parameter values
|
|
# - first=first
|
|
# - second=second
|
|
# - '--' # start of user command and args
|
|
# args:
|
|
# - sh
|
|
# - '-ec'
|
|
# - |
|
|
# program_path=$(mktemp)
|
|
# printf "%s" "$0" > "$program_path"
|
|
# python3 -u "$program_path" "$@"
|
|
# - >
|
|
# import json
|
|
# import xxx
|
|
# ...
|
|
op.command = [
|
|
"/kfp-launcher/launch",
|
|
"--mlmd_server_address",
|
|
"$(METADATA_GRPC_SERVICE_HOST)",
|
|
"--mlmd_server_port",
|
|
"$(METADATA_GRPC_SERVICE_PORT)",
|
|
"--runtime_info_json",
|
|
"$(KFP_V2_RUNTIME_INFO)",
|
|
"--container_image",
|
|
"$(KFP_V2_IMAGE)",
|
|
"--task_name",
|
|
op.name,
|
|
"--pipeline_name",
|
|
pipeline_name,
|
|
"--run_id",
|
|
"$(KFP_RUN_ID)",
|
|
"--run_resource",
|
|
"workflows.argoproj.io/$(WORKFLOW_ID)",
|
|
"--namespace",
|
|
"$(KFP_NAMESPACE)",
|
|
"--pod_name",
|
|
"$(KFP_POD_NAME)",
|
|
"--pod_uid",
|
|
"$(KFP_POD_UID)",
|
|
"--pipeline_root",
|
|
pipeline_root,
|
|
"--enable_caching",
|
|
"$(ENABLE_CACHING)",
|
|
]
|
|
|
|
# Mount necessary environment variables.
|
|
op.apply(_default_transformers.add_kfp_pod_env)
|
|
op.container.add_env_variable(
|
|
k8s_client.V1EnvVar(name="KFP_V2_IMAGE", value=op.container.image))
|
|
|
|
config_map_ref = k8s_client.V1ConfigMapEnvSource(
|
|
name='metadata-grpc-configmap', optional=True)
|
|
op.container.add_env_from(
|
|
k8s_client.V1EnvFromSource(config_map_ref=config_map_ref))
|
|
|
|
op.arguments = list(op.container_spec.command) + list(
|
|
op.container_spec.args)
|
|
|
|
runtime_info = {
|
|
"inputParameters": collections.OrderedDict(),
|
|
"inputArtifacts": collections.OrderedDict(),
|
|
"outputParameters": collections.OrderedDict(),
|
|
"outputArtifacts": collections.OrderedDict(),
|
|
}
|
|
|
|
op.command += ["--"]
|
|
component_spec = op.component_spec
|
|
for parameter, spec in sorted(
|
|
component_spec.input_definitions.parameters.items()):
|
|
parameter_info = {
|
|
"type":
|
|
pipeline_spec_pb2.PrimitiveType.PrimitiveTypeEnum.Name(spec.type
|
|
),
|
|
}
|
|
op.command += [f"{parameter}={op._parameter_arguments[parameter]}"]
|
|
runtime_info["inputParameters"][parameter] = parameter_info
|
|
op.command += ["--"]
|
|
|
|
for artifact_name, spec in sorted(
|
|
component_spec.input_definitions.artifacts.items()):
|
|
artifact_info = {
|
|
"metadataPath": op.input_artifact_paths[artifact_name],
|
|
"schemaTitle": spec.artifact_type.schema_title,
|
|
"instanceSchema": spec.artifact_type.instance_schema,
|
|
"schemaVersion": spec.artifact_type.schema_version,
|
|
}
|
|
runtime_info["inputArtifacts"][artifact_name] = artifact_info
|
|
|
|
for parameter, spec in sorted(
|
|
component_spec.output_definitions.parameters.items()):
|
|
parameter_info = {
|
|
"type":
|
|
pipeline_spec_pb2.PrimitiveType.PrimitiveTypeEnum.Name(spec.type
|
|
),
|
|
"path":
|
|
op.file_outputs[parameter],
|
|
}
|
|
runtime_info["outputParameters"][parameter] = parameter_info
|
|
|
|
for artifact_name, spec in sorted(
|
|
component_spec.output_definitions.artifacts.items()):
|
|
# TODO: Assert instance_schema.
|
|
artifact_info = {
|
|
# Type used to register output artifacts.
|
|
"schemaTitle": spec.artifact_type.schema_title,
|
|
"instanceSchema": spec.artifact_type.instance_schema,
|
|
"schemaVersion": spec.artifact_type.schema_version,
|
|
# File used to write out the registered artifact ID.
|
|
"metadataPath": op.file_outputs[artifact_name],
|
|
}
|
|
runtime_info["outputArtifacts"][artifact_name] = artifact_info
|
|
|
|
op.container.add_env_variable(
|
|
k8s_client.V1EnvVar(
|
|
name="KFP_V2_RUNTIME_INFO", value=json.dumps(runtime_info)))
|
|
|
|
op.pod_annotations['pipelines.kubeflow.org/v2_component'] = "true"
|
|
op.pod_labels['pipelines.kubeflow.org/v2_component'] = "true"
|