138 lines
5.0 KiB
Python
138 lines
5.0 KiB
Python
# Copyright 2021 The Kubeflow Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Utilities functions KFP DSL."""
|
|
|
|
import re
|
|
from typing import Callable, List, Optional, Union
|
|
|
|
from kfp.deprecated.components import _structures
|
|
from kfp.pipeline_spec import pipeline_spec_pb2
|
|
|
|
_COMPONENT_NAME_PREFIX = 'comp-'
|
|
_EXECUTOR_LABEL_PREFIX = 'exec-'
|
|
|
|
# TODO: Support all declared types in
|
|
# components._structures.CommandlineArgumenType
|
|
_CommandlineArgumentType = Union[str, int, float,
|
|
_structures.InputValuePlaceholder,
|
|
_structures.InputPathPlaceholder,
|
|
_structures.OutputPathPlaceholder,
|
|
_structures.InputUriPlaceholder,
|
|
_structures.OutputUriPlaceholder,
|
|
_structures.ExecutorInputPlaceholder]
|
|
|
|
|
|
def sanitize_component_name(name: str) -> str:
|
|
"""Sanitizes component name."""
|
|
return _COMPONENT_NAME_PREFIX + _sanitize_name(name)
|
|
|
|
|
|
def sanitize_task_name(name: str) -> str:
|
|
"""Sanitizes task name."""
|
|
return _sanitize_name(name)
|
|
|
|
|
|
def sanitize_executor_label(label: str) -> str:
|
|
"""Sanitizes executor label."""
|
|
return _EXECUTOR_LABEL_PREFIX + _sanitize_name(label)
|
|
|
|
|
|
def _sanitize_name(name: str) -> str:
|
|
"""Sanitizes name to comply with IR naming convention.
|
|
|
|
The sanitized name contains only lower case alphanumeric characters
|
|
and dashes.
|
|
"""
|
|
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-',
|
|
name.lower())).lstrip('-').rstrip('-')
|
|
|
|
|
|
def get_value(value: Union[str, int, float]) -> pipeline_spec_pb2.Value:
|
|
"""Gets pipeline value proto from Python value."""
|
|
result = pipeline_spec_pb2.Value()
|
|
if isinstance(value, str):
|
|
result.string_value = value
|
|
elif isinstance(value, int):
|
|
result.int_value = value
|
|
elif isinstance(value, float):
|
|
result.double_value = value
|
|
else:
|
|
raise TypeError(
|
|
'Got unexpected type %s for value %s. Currently only support str, int '
|
|
'and float.' % (type(value), value))
|
|
return result
|
|
|
|
|
|
# TODO: share with dsl._component_bridge
|
|
def _input_artifact_uri_placeholder(input_key: str) -> str:
|
|
return "{{{{$.inputs.artifacts['{}'].uri}}}}".format(input_key)
|
|
|
|
|
|
def _input_artifact_path_placeholder(input_key: str) -> str:
|
|
return "{{{{$.inputs.artifacts['{}'].path}}}}".format(input_key)
|
|
|
|
|
|
def _input_parameter_placeholder(input_key: str) -> str:
|
|
return "{{{{$.inputs.parameters['{}']}}}}".format(input_key)
|
|
|
|
|
|
def _output_artifact_uri_placeholder(output_key: str) -> str:
|
|
return "{{{{$.outputs.artifacts['{}'].uri}}}}".format(output_key)
|
|
|
|
|
|
def _output_artifact_path_placeholder(output_key: str) -> str:
|
|
return "{{{{$.outputs.artifacts['{}'].path}}}}".format(output_key)
|
|
|
|
|
|
def _output_parameter_path_placeholder(output_key: str) -> str:
|
|
return "{{{{$.outputs.parameters['{}'].output_file}}}}".format(output_key)
|
|
|
|
|
|
def _executor_input_placeholder() -> str:
|
|
return "{{{{$}}}}"
|
|
|
|
|
|
def resolve_cmd_lines(cmds: Optional[List[_CommandlineArgumentType]],
|
|
is_output_parameter: Callable[[str], bool]) -> None:
|
|
"""Resolves a list of commands/args."""
|
|
|
|
def _resolve_cmd(cmd: Optional[_CommandlineArgumentType]) -> Optional[str]:
|
|
"""Resolves a single command line cmd/arg."""
|
|
if cmd is None:
|
|
return None
|
|
elif isinstance(cmd, (str, float, int)):
|
|
return str(cmd)
|
|
elif isinstance(cmd, _structures.InputValuePlaceholder):
|
|
return _input_parameter_placeholder(cmd.input_name)
|
|
elif isinstance(cmd, _structures.InputPathPlaceholder):
|
|
return _input_artifact_path_placeholder(cmd.input_name)
|
|
elif isinstance(cmd, _structures.InputUriPlaceholder):
|
|
return _input_artifact_uri_placeholder(cmd.input_name)
|
|
elif isinstance(cmd, _structures.OutputPathPlaceholder):
|
|
if is_output_parameter(cmd.output_name):
|
|
return _output_parameter_path_placeholder(cmd.output_name)
|
|
else:
|
|
return _output_artifact_path_placeholder(cmd.output_name)
|
|
elif isinstance(cmd, _structures.OutputUriPlaceholder):
|
|
return _output_artifact_uri_placeholder(cmd.output_name)
|
|
elif isinstance(cmd, _structures.ExecutorInputPlaceholder):
|
|
return _executor_input_placeholder()
|
|
else:
|
|
raise TypeError('Got unexpected placeholder type for %s' % cmd)
|
|
|
|
if not cmds:
|
|
return
|
|
for idx, cmd in enumerate(cmds):
|
|
cmds[idx] = _resolve_cmd(cmd)
|