feat(sdk): add support for IfPresentPlaceholder and ConcatPlaceholder strings (#7795)

* move placeholders to placeholders module; implement if-present and concat placeholders

* update module name and method names throughout codebase

* rename placeholder abc

* add back compat support for CEL-style concat string

* add copyright

* add docstrings; clean up

* remove duplicate code
This commit is contained in:
Connor McCarthy 2022-06-09 01:42:00 -06:00 committed by GitHub
parent 79b33356f3
commit de0b824be1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 934 additions and 630 deletions

View File

@ -17,15 +17,17 @@ import collections
import json
import re
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
import warnings
import kfp
from google.protobuf import json_format
from google.protobuf import struct_pb2
import kfp
from kfp import dsl
from kfp.compiler import pipeline_spec_builder as builder
from kfp.components import for_loop
from kfp.components import pipeline_channel
from kfp.components import pipeline_task
from kfp.components import placeholders
from kfp.components import structures
from kfp.components import tasks_group
from kfp.components import utils
@ -33,6 +35,7 @@ from kfp.components import utils as component_utils
from kfp.components.types import artifact_types
from kfp.components.types import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2
import yaml
GroupOrTaskType = Union[tasks_group.TasksGroup, pipeline_task.PipelineTask]
@ -247,8 +250,8 @@ def build_task_spec_for_task(
'{} and compiler injected input name {}'.format(
existing_input_name, additional_input_name))
additional_input_placeholder = structures.InputValuePlaceholder(
additional_input_name).to_placeholder()
additional_input_placeholder = placeholders.InputValuePlaceholder(
additional_input_name).to_placeholder_string()
input_value = input_value.replace(channel.pattern,
additional_input_placeholder)

View File

@ -13,13 +13,17 @@
# limitations under the License.
"""Tests for kfp.compiler.pipeline_spec_builder."""
import os
import tempfile
import unittest
from absl.testing import parameterized
from google.protobuf import json_format
from google.protobuf import struct_pb2
from kfp.compiler import pipeline_spec_builder
from kfp.components import pipeline_channel
from kfp.pipeline_spec import pipeline_spec_pb2
import yaml
class PipelineSpecBuilderTest(parameterized.TestCase):
@ -199,5 +203,11 @@ class TestValidatePipelineName(parameterized.TestCase):
pipeline_spec_builder.validate_pipeline_name('my_pipeline')
def pipeline_spec_from_file(filepath: str) -> str:
with open(filepath, 'r') as f:
dictionary = yaml.safe_load(f)
return json_format.ParseDict(dictionary, pipeline_spec_pb2.PipelineSpec())
if __name__ == '__main__':
unittest.main()

View File

@ -18,6 +18,7 @@ from unittest.mock import patch
from kfp.components import base_component
from kfp.components import pipeline_task
from kfp.components import placeholders
from kfp.components import structures
@ -37,10 +38,10 @@ component_op = TestComponent(
'sh',
'-c',
'set -ex\necho "$0" "$1" "$2" > "$3"',
structures.InputValuePlaceholder(input_name='input1'),
structures.InputValuePlaceholder(input_name='input2'),
structures.InputValuePlaceholder(input_name='input3'),
structures.OutputPathPlaceholder(output_name='output1'),
placeholders.InputValuePlaceholder(input_name='input1'),
placeholders.InputValuePlaceholder(input_name='input2'),
placeholders.InputValuePlaceholder(input_name='input3'),
placeholders.OutputPathPlaceholder(output_name='output1'),
],
)),
inputs={

View File

@ -18,6 +18,7 @@ from typing import Any, Mapping, Optional, Type, Union
from kfp.components import importer_component
from kfp.components import pipeline_channel
from kfp.components import pipeline_task
from kfp.components import placeholders
from kfp.components import structures
from kfp.components.types import artifact_types
@ -51,8 +52,8 @@ def importer(
name='importer',
implementation=structures.Implementation(
importer=structures.ImporterSpec(
artifact_uri=structures.InputValuePlaceholder(
INPUT_KEY).to_placeholder(),
artifact_uri=placeholders.InputValuePlaceholder(
INPUT_KEY).to_placeholder_string(),
type_schema=artifact_class.TYPE_NAME,
reimport=reimport,
metadata=metadata)),

View File

@ -19,6 +19,7 @@ from typing import Any, List, Mapping, Optional, Union
from kfp.components import constants
from kfp.components import pipeline_channel
from kfp.components import placeholders
from kfp.components import structures
from kfp.components import utils
from kfp.components.types import type_utils
@ -216,7 +217,7 @@ class PipelineTask:
elif isinstance(arg, (dict, list)):
return json.dumps(arg)
elif isinstance(arg, structures.InputValuePlaceholder):
elif isinstance(arg, placeholders.InputValuePlaceholder):
input_name = arg.input_name
if not type_utils.is_parameter_type(
inputs_dict[input_name].type):
@ -227,7 +228,7 @@ class PipelineTask:
if input_name in args or type_utils.is_task_final_status_type(
inputs_dict[input_name].type):
return arg.to_placeholder()
return arg.to_placeholder_string()
else:
input_spec = inputs_dict[input_name]
if input_spec.default is not None:
@ -236,7 +237,7 @@ class PipelineTask:
raise ValueError(
f'No value provided for input: {input_name}.')
elif isinstance(arg, structures.InputUriPlaceholder):
elif isinstance(arg, placeholders.InputUriPlaceholder):
input_name = arg.input_name
if type_utils.is_parameter_type(inputs_dict[input_name].type):
raise TypeError(
@ -245,7 +246,7 @@ class PipelineTask:
'InputUriPlaceholder.')
if input_name in args:
input_uri = arg.to_placeholder()
input_uri = arg.to_placeholder_string()
return input_uri
else:
input_spec = inputs_dict[input_name]
@ -255,7 +256,7 @@ class PipelineTask:
raise ValueError(
f'No value provided for input: {input_name}.')
elif isinstance(arg, structures.InputPathPlaceholder):
elif isinstance(arg, placeholders.InputPathPlaceholder):
input_name = arg.input_name
if type_utils.is_parameter_type(inputs_dict[input_name].type):
raise TypeError(
@ -264,7 +265,7 @@ class PipelineTask:
'InputPathPlaceholder.')
if input_name in args:
input_path = arg.to_placeholder()
input_path = arg.to_placeholder_string()
return input_path
else:
input_spec = inputs_dict[input_name]
@ -274,7 +275,7 @@ class PipelineTask:
raise ValueError(
f'No value provided for input: {input_name}.')
elif isinstance(arg, structures.OutputUriPlaceholder):
elif isinstance(arg, placeholders.OutputUriPlaceholder):
output_name = arg.output_name
if type_utils.is_parameter_type(outputs_dict[output_name].type):
raise TypeError(
@ -282,19 +283,19 @@ class PipelineTask:
f'"{outputs_dict[output_name].type}" cannot be paired with '
'OutputUriPlaceholder.')
return arg.to_placeholder()
return arg.to_placeholder_string()
elif isinstance(arg, structures.OutputPathPlaceholder):
elif isinstance(arg, placeholders.OutputPathPlaceholder):
output_name = arg.output_name
if type_utils.is_parameter_type(outputs_dict[output_name].type):
output_path = structures.OutputParameterPlaceholder(
arg.output_name).to_placeholder()
output_path = placeholders.OutputParameterPlaceholder(
arg.output_name).to_placeholder_string()
else:
output_path = arg.to_placeholder()
output_path = arg.to_placeholder_string()
return output_path
elif isinstance(arg, structures.OutputParameterPlaceholder):
elif isinstance(arg, placeholders.OutputParameterPlaceholder):
output_name = arg.output_name
if not type_utils.is_parameter_type(
outputs_dict[output_name].type):
@ -303,17 +304,17 @@ class PipelineTask:
f'"{outputs_dict[output_name].type}" cannot be paired with '
'OutputUriPlaceholder.')
return arg.to_placeholder()
return arg.to_placeholder_string()
elif isinstance(arg, structures.ConcatPlaceholder):
elif isinstance(arg, placeholders.ConcatPlaceholder):
expanded_argument_strings = expand_argument_list(arg.items)
return ''.join(expanded_argument_strings)
elif isinstance(arg, structures.IfPresentPlaceholder):
if arg.if_structure.input_name in argument_values:
result_node = arg.if_structure.then
elif isinstance(arg, placeholders.IfPresentPlaceholder):
if arg.input_name in argument_values:
result_node = arg.then
else:
result_node = arg.if_structure.otherwise
result_node = arg.else_
if result_node is None:
return []

View File

@ -18,6 +18,7 @@ import unittest
from absl.testing import parameterized
from kfp.components import pipeline_task
from kfp.components import placeholders
from kfp.components import structures
V2_YAML = textwrap.dedent("""\
@ -81,8 +82,9 @@ class PipelineTaskTest(parameterized.TestCase):
image='alpine',
command=['sh', '-c', 'echo "$0" >> "$1"'],
args=[
structures.InputValuePlaceholder(input_name='input1'),
structures.OutputPathPlaceholder(output_name='output1'),
placeholders.InputValuePlaceholder(input_name='input1'),
placeholders.OutputPathPlaceholder(
output_name='output1'),
],
)),
inputs={

View File

@ -0,0 +1,467 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains data structures and functions for handling input and output
placeholders."""
import abc
import dataclasses
import json
from json.decoder import JSONArray # type: ignore
from json.scanner import py_make_scanner
import re
from typing import Any, Dict, List, Optional, Union
from kfp.components import base_model
class Placeholder(abc.ABC):
"""Abstract base class for Placeholders.
All placeholders must implement these methods to be handled
appropriately downstream.
"""
@classmethod
@abc.abstractmethod
def from_placeholder_string(cls, placeholder_string: str) -> 'Placeholder':
"""Converts a placeholder string to the placeholder object that
implements this method.
Args:
placeholder_string (str): The placeholder string.
Returns:
Placeholder: The placeholder object that implements this method.
"""
raise NotImplementedError
@classmethod
@abc.abstractmethod
def is_match(cls, placeholder_string: str) -> bool:
"""Checks if the placeholder string matches the placeholder object that
implements this method.
Args:
placeholder_string (str): The placeholder string.
Returns:
bool: Whether the placeholder string matches the placeholder object that implements this method and can be converted to an instance of the placeholder object.
"""
raise NotImplementedError
@abc.abstractmethod
def to_placeholder_string(self) -> str:
"""Converts the placeholder object that implements this to a
placeholder string.
Returns:
str: The placeholder string.
"""
raise NotImplementedError
@abc.abstractmethod
def to_dict(self, by_alias: bool = False) -> Dict[str, Any]:
"""Converts the placeholder object that implements this to a
dictionary. This ensures that this concrete placeholder classes also
inherit from kfp.components.base_model.BaseModel.
Args:
by_alias (bool, optional): Whether to use attribute name to alias field mapping provided by cls._aliases when converting to dictionary. Defaults to False.
Returns:
Dict[str, Any]: Dictionary representation of the object.
"""
raise NotImplementedError
class RegexPlaceholderSerializationMixin(Placeholder):
"""Mixin for *Placeholder objects that handles the
serialization/deserialization of the placeholder."""
_FROM_PLACEHOLDER: Union[re.Pattern, type(NotImplemented)] = NotImplemented
_TO_PLACEHOLDER: Union[str, type(NotImplemented)] = NotImplemented
@classmethod
def is_match(cls, placeholder_string: str) -> bool:
"""Determines if the placeholder_string matches the placeholder pattern
using the _FROM_PLACEHOLDER regex.
Args:
placeholder_string (str): The string (often "{{$.inputs/outputs...}}") to check.
Returns:
bool: Determines if the placeholder_string matches the placeholder pattern.
"""
return cls._FROM_PLACEHOLDER.match(placeholder_string) is not None
@classmethod
def from_placeholder_string(
cls,
placeholder_string: str) -> 'RegexPlaceholderSerializationMixin':
"""Converts a placeholder string into a placeholder object.
Args:
placeholder_string (str): The placeholder.
Returns:
PlaceholderSerializationMixin subclass: The placeholder object.
"""
if cls._FROM_PLACEHOLDER == NotImplemented:
raise NotImplementedError(
f'{cls.__name__} does not support placeholder parsing.')
matches = re.search(cls._FROM_PLACEHOLDER, placeholder_string)
if matches is None:
raise ValueError(
f'Could not parse placeholder: {placeholder_string} into {cls.__name__}'
)
field_names = [field.name for field in dataclasses.fields(cls)]
if len(matches.groups()) > len(field_names):
raise ValueError(
f'Could not parse placeholder string: {placeholder_string}. Expected no more than {len(field_names)} groups matched for fields {field_names}. Got {len(matches.groups())} matched: {matches.groups()}.'
)
kwargs = {field_name: matches[field_name] for field_name in field_names}
return cls(**kwargs)
def to_placeholder_string(self) -> str:
"""Converts a placeholder object into a placeholder string.
Returns:
str: The placeholder string.
"""
if self._TO_PLACEHOLDER == NotImplemented:
raise NotImplementedError(
f'{self.__class__.__name__} does not support creating placeholder strings.'
)
return self._TO_PLACEHOLDER.format(**self.to_dict())
class InputValuePlaceholder(base_model.BaseModel,
RegexPlaceholderSerializationMixin):
"""Class that holds an input value placeholder.
Attributes:
output_name: Name of the input.
"""
input_name: str
_aliases = {'input_name': 'inputValue'}
_TO_PLACEHOLDER = "{{{{$.inputs.parameters['{input_name}']}}}}"
_FROM_PLACEHOLDER = re.compile(
r"\{\{\$\.inputs\.parameters\[(?:''|'|\")(?P<input_name>.+?)(?:''|'|\")]\}\}"
)
class InputPathPlaceholder(base_model.BaseModel,
RegexPlaceholderSerializationMixin):
"""Class that holds an input path placeholder.
Attributes:
output_name: Name of the input.
"""
input_name: str
_aliases = {'input_name': 'inputPath'}
_TO_PLACEHOLDER = "{{{{$.inputs.artifacts['{input_name}'].path}}}}"
_FROM_PLACEHOLDER = re.compile(
r"^\{\{\$\.inputs\.artifacts\[(?:''|'|\")(?P<input_name>.+?)(?:''|'|\")]\.path\}\}$"
)
class InputUriPlaceholder(base_model.BaseModel,
RegexPlaceholderSerializationMixin):
"""Class that holds an input uri placeholder.
Attributes:
output_name: Name of the input.
"""
input_name: str
_aliases = {'input_name': 'inputUri'}
_TO_PLACEHOLDER = "{{{{$.inputs.artifacts['{input_name}'].uri}}}}"
_FROM_PLACEHOLDER = re.compile(
r"^\{\{\$\.inputs\.artifacts\[(?:''|'|\")(?P<input_name>.+?)(?:''|'|\")]\.uri\}\}$"
)
class OutputParameterPlaceholder(base_model.BaseModel,
RegexPlaceholderSerializationMixin):
"""Class that holds an output parameter placeholder.
Attributes:
output_name: Name of the input.
"""
output_name: str
_aliases = {'output_name': 'outputPath'}
_TO_PLACEHOLDER = "{{{{$.outputs.parameters['{output_name}'].output_file}}}}"
_FROM_PLACEHOLDER = re.compile(
r"^\{\{\$\.outputs\.parameters\[(?:''|'|\")(?P<output_name>.+?)(?:''|'|\")]\.output_file\}\}$"
)
class OutputPathPlaceholder(base_model.BaseModel,
RegexPlaceholderSerializationMixin):
"""Class that holds an output path placeholder.
Attributes:
output_name: Name of the input.
"""
output_name: str
_aliases = {'output_name': 'outputPath'}
_TO_PLACEHOLDER = "{{{{$.outputs.artifacts['{output_name}'].path}}}}"
_FROM_PLACEHOLDER = re.compile(
r"^\{\{\$\.outputs\.artifacts\[(?:''|'|\")(?P<output_name>.+?)(?:''|'|\")]\.path\}\}$"
)
class OutputUriPlaceholder(base_model.BaseModel,
RegexPlaceholderSerializationMixin):
"""Class that holds output uri for conditional cases.
Attributes:
output_name: name of the output.
"""
output_name: str
_aliases = {'output_name': 'outputUri'}
_TO_PLACEHOLDER = "{{{{$.outputs.artifacts['{output_name}'].uri}}}}"
_FROM_PLACEHOLDER = re.compile(
r"^\{\{\$\.outputs\.artifacts\[(?:''|'|\")(?P<output_name>.+?)(?:''|'|\")]\.uri\}\}$"
)
CommandLineElement = Union[str, InputValuePlaceholder, InputPathPlaceholder,
InputUriPlaceholder, OutputParameterPlaceholder,
OutputPathPlaceholder, OutputUriPlaceholder,
'IfPresentPlaceholder', 'ConcatPlaceholder']
class ConcatPlaceholder(base_model.BaseModel, Placeholder):
"""Placeholder for concatenating multiple strings. May contain other
placeholders.
Attributes:
items: Elements to concatenate.
"""
items: List[CommandLineElement]
@classmethod
def split_cel_concat_string(self, string: str) -> List[str]:
"""Splits a cel string into a list of strings, which may be normal
strings or placeholder strings.
Args:
cel_string (str): The cel string.
Returns:
List[str]: The list of strings.
"""
concat_char = '+'
start_ends = [(match.start(0), match.end(0)) for match in
InputValuePlaceholder._FROM_PLACEHOLDER.finditer(string)]
items = []
if start_ends:
start = 0
for match_start, match_end in start_ends:
leading_string = string[start:match_start]
if leading_string and leading_string != concat_char:
items.append(leading_string)
items.append(string[match_start:match_end])
start = match_end
trailing_string = string[match_end:]
if trailing_string and trailing_string != concat_char:
items.append(trailing_string)
return items
@classmethod
def is_match(cls, placeholder_string: str) -> bool:
# 'Concat' is the explicit struct for concatenation
# cel splitting handles the cases of {{input}}+{{input}} and {{input}}otherstring
return 'Concat' in json_load_nested_placeholder_aware(
placeholder_string
) or len(
ConcatPlaceholder.split_cel_concat_string(placeholder_string)) > 1
def to_placeholder_struct(self) -> Dict[str, Any]:
return {
"Concat": [
maybe_convert_placeholder_to_placeholder_string(item)
for item in self.items
]
}
def to_placeholder_string(self) -> str:
return json.dumps(self.to_placeholder_struct())
@classmethod
def from_placeholder_string(cls,
placeholder_string: str) -> 'ConcatPlaceholder':
placeholder_struct = json_load_nested_placeholder_aware(
placeholder_string)
if isinstance(placeholder_struct, str):
items = [
maybe_convert_placeholder_string_to_placeholder(item)
for item in cls.split_cel_concat_string(placeholder_struct)
]
return cls(items=items)
elif isinstance(placeholder_struct, dict):
items = [
maybe_convert_placeholder_string_to_placeholder(item)
for item in placeholder_struct['Concat']
]
return ConcatPlaceholder(items=items)
raise ValueError
class IfPresentPlaceholder(base_model.BaseModel, Placeholder):
"""Placeholder for handling cases where an input may or may not be passed.
May contain other placeholders.
Attributes:
input_name: name of the input/output.
then: If the input/output specified in name is present
the command-line argument will be replaced at run-time by the
expanded value of then.
else_: If the input/output specified in name is not present,
the command-line argument will be replaced at run-time by the
expanded value of otherwise.
"""
input_name: str
then: List[CommandLineElement]
else_: Optional[List[CommandLineElement]] = None
_aliases = {'input_name': 'inputName', 'else_': 'else'}
@classmethod
def is_match(cls, string: str) -> bool:
try:
return "IfPresent" in json.loads(string)
except json.decoder.JSONDecodeError:
return False
def to_placeholder_struct(self) -> Dict[str, Any]:
then = [
maybe_convert_placeholder_to_placeholder_string(item)
for item in self.then
] if isinstance(self.then, list) else self.then
struct = {"IfPresent": {"InputName": self.input_name, "Then": then}}
if self.else_:
otherwise = [
maybe_convert_placeholder_to_placeholder_string(item)
for item in self.else_
] if isinstance(self.else_, list) else self.else_
struct["IfPresent"]["Else"] = otherwise
return struct
def to_placeholder_string(self) -> str:
return json.dumps(self.to_placeholder_struct())
@classmethod
def from_placeholder_string(
cks, placeholder_string: str) -> 'IfPresentPlaceholder':
struct = json_load_nested_placeholder_aware(placeholder_string)
struct_body = struct['IfPresent']
then = struct_body['Then']
then = [
maybe_convert_placeholder_string_to_placeholder(item)
for item in then
] if isinstance(then, list) else then
else_ = struct_body.get('Else')
else_ = [
maybe_convert_placeholder_string_to_placeholder(item)
for item in else_
] if isinstance(else_, list) else else_
kwargs = {
'input_name': struct_body['InputName'],
'then': then,
'else_': else_
}
return IfPresentPlaceholder(**kwargs)
def transform_else(self) -> None:
"""Use None instead of empty list for optional."""
self.else_ = None if self.else_ == [] else self.else_
class CustomizedDecoder(json.JSONDecoder):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def parse_array(*_args, **_kwargs):
values, end = JSONArray(*_args, **_kwargs)
for i, item in enumerate(values):
if isinstance(item, dict):
values[i] = json.dumps(item)
return values, end
self.parse_array = parse_array
self.scan_once = py_make_scanner(self)
def json_load_nested_placeholder_aware(
placeholder_string: str
) -> Union[str, Dict[str, Union[str, List[str], dict]]]:
try:
return json.loads(placeholder_string, cls=CustomizedDecoder)
except json.JSONDecodeError:
return placeholder_string
def maybe_convert_placeholder_string_to_placeholder(
placeholder_string: str) -> CommandLineElement:
"""Infers if a command is a placeholder and converts it to the correct
Placeholder object.
Args:
arg (str): The arg or command to possibly convert.
Returns:
CommandLineElement: The converted command or original string.
"""
if not placeholder_string.startswith('{'):
return placeholder_string
# order matters here!
from_string_placeholders = [
IfPresentPlaceholder,
ConcatPlaceholder,
InputValuePlaceholder,
InputPathPlaceholder,
InputUriPlaceholder,
OutputPathPlaceholder,
OutputUriPlaceholder,
OutputParameterPlaceholder,
]
for placeholder_struct in from_string_placeholders:
if placeholder_struct.is_match(placeholder_string):
return placeholder_struct.from_placeholder_string(
placeholder_string)
return placeholder_string
def maybe_convert_placeholder_to_placeholder_string(
placeholder: CommandLineElement) -> str:
"""Converts a placeholder to a placeholder string if it's a subclass of
Placeholder.
Args:
placeholder (Placeholder): The placeholder to convert.
Returns:
str: The placeholder string.
"""
if isinstance(placeholder, Placeholder):
return placeholder.to_placeholder_struct() if hasattr(
placeholder,
'to_placeholder_struct') else placeholder.to_placeholder_string()
return placeholder

View File

@ -0,0 +1,296 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains tests for kfp.components.placeholders."""
from typing import List
import unittest
from absl.testing import parameterized
from kfp.components import placeholders
from kfp.components import structures
class TestInputValuePlaceholder(parameterized.TestCase):
@parameterized.parameters([
("{{$.inputs.parameters['input1']}}",
placeholders.InputValuePlaceholder('input1')),
])
def test_to_from_placeholder(
self, placeholder_string: str,
placeholder_obj: placeholders.InputValuePlaceholder):
self.assertEqual(
placeholders.InputValuePlaceholder.from_placeholder_string(
placeholder_string), placeholder_obj)
self.assertEqual(placeholder_obj.to_placeholder_string(),
placeholder_string)
@parameterized.parameters([
("{{$.inputs.parameters[''input1'']}}",
placeholders.InputValuePlaceholder('input1')),
('{{$.inputs.parameters["input1"]}}',
placeholders.InputValuePlaceholder('input1')),
])
def test_from_placeholder_special_quote_case(
self, placeholder_string: str,
placeholder_obj: placeholders.InputValuePlaceholder):
self.assertEqual(
placeholders.InputValuePlaceholder.from_placeholder_string(
placeholder_string), placeholder_obj)
class TestInputPathPlaceholder(parameterized.TestCase):
@parameterized.parameters([
("{{$.inputs.artifacts['input1'].path}}",
placeholders.InputPathPlaceholder('input1')),
])
def test_to_from_placeholder(
self, placeholder_string: str,
placeholder_obj: placeholders.InputPathPlaceholder):
self.assertEqual(
placeholders.InputPathPlaceholder.from_placeholder_string(
placeholder_string), placeholder_obj)
self.assertEqual(placeholder_obj.to_placeholder_string(),
placeholder_string)
class TestInputUriPlaceholder(parameterized.TestCase):
@parameterized.parameters([
("{{$.inputs.artifacts['input1'].uri}}",
placeholders.InputUriPlaceholder('input1')),
])
def test_to_from_placeholder(
self, placeholder_string: str,
placeholder_obj: placeholders.InputUriPlaceholder):
self.assertEqual(
placeholders.InputUriPlaceholder.from_placeholder_string(
placeholder_string), placeholder_obj)
self.assertEqual(placeholder_obj.to_placeholder_string(),
placeholder_string)
class TestOutputPathPlaceholder(parameterized.TestCase):
@parameterized.parameters([
("{{$.outputs.artifacts['output1'].path}}",
placeholders.OutputPathPlaceholder('output1')),
])
def test_to_from_placeholder(
self, placeholder_string: str,
placeholder_obj: placeholders.OutputPathPlaceholder):
self.assertEqual(
placeholders.OutputPathPlaceholder.from_placeholder_string(
placeholder_string), placeholder_obj)
self.assertEqual(placeholder_obj.to_placeholder_string(),
placeholder_string)
class TestOutputParameterPlaceholder(parameterized.TestCase):
@parameterized.parameters([
("{{$.outputs.parameters['output1'].output_file}}",
placeholders.OutputParameterPlaceholder('output1')),
])
def test_to_from_placeholder(
self, placeholder_string: str,
placeholder_obj: placeholders.OutputParameterPlaceholder):
self.assertEqual(
placeholders.OutputParameterPlaceholder.from_placeholder_string(
placeholder_string), placeholder_obj)
self.assertEqual(placeholder_obj.to_placeholder_string(),
placeholder_string)
class TestOutputUriPlaceholder(parameterized.TestCase):
@parameterized.parameters([
("{{$.outputs.artifacts['output1'].uri}}",
placeholders.OutputUriPlaceholder('output1')),
])
def test_to_from_placeholder(
self, placeholder_string: str,
placeholder_obj: placeholders.OutputUriPlaceholder):
self.assertEqual(
placeholders.OutputUriPlaceholder.from_placeholder_string(
placeholder_string), placeholder_obj)
self.assertEqual(placeholder_obj.to_placeholder_string(),
placeholder_string)
class TestIfPresentPlaceholderStructure(parameterized.TestCase):
def test_else_transform(self):
obj = placeholders.IfPresentPlaceholder(
then='then', input_name='input_name', else_=['something'])
self.assertEqual(obj.else_, ['something'])
obj = placeholders.IfPresentPlaceholder(
then='then', input_name='input_name', else_=[])
self.assertEqual(obj.else_, None)
@parameterized.parameters([
('{"IfPresent": {"InputName": "output1", "Then": "then", "Else": "something"}}',
placeholders.IfPresentPlaceholder(
input_name='output1', then='then', else_='something')),
('{"IfPresent": {"InputName": "output1", "Then": "then"}}',
placeholders.IfPresentPlaceholder(input_name='output1', then='then')),
('{"IfPresent": {"InputName": "output1", "Then": "then"}}',
placeholders.IfPresentPlaceholder('output1', 'then')),
('{"IfPresent": {"InputName": "output1", "Then": ["then"], "Else": ["something"]}}',
placeholders.IfPresentPlaceholder(
input_name='output1', then=['then'], else_=['something'])),
('{"IfPresent": {"InputName": "output1", "Then": ["then", {"IfPresent": {"InputName": "output1", "Then": ["then"], "Else": ["something"]}}], "Else": ["something"]}}',
placeholders.IfPresentPlaceholder(
input_name='output1',
then=[
'then',
placeholders.IfPresentPlaceholder(
input_name='output1', then=['then'], else_=['something'])
],
else_=['something'])),
])
def test_to_from_placeholder(
self, placeholder_string: str,
placeholder_obj: placeholders.ConcatPlaceholder):
self.assertEqual(
placeholders.IfPresentPlaceholder.from_placeholder_string(
placeholder_string), placeholder_obj)
self.assertEqual(placeholder_obj.to_placeholder_string(),
placeholder_string)
class TestConcatPlaceholder(parameterized.TestCase):
@parameterized.parameters([
('{"Concat": ["a", "b"]}', placeholders.ConcatPlaceholder(['a', 'b'])),
('{"Concat": ["a", {"Concat": ["b", "c"]}]}',
placeholders.ConcatPlaceholder(
['a', placeholders.ConcatPlaceholder(['b', 'c'])])),
('{"Concat": ["a", {"Concat": ["b", {"IfPresent": {"InputName": "output1", "Then": ["then"], "Else": ["something"]}}]}]}',
placeholders.ConcatPlaceholder([
'a',
placeholders.ConcatPlaceholder([
'b',
placeholders.IfPresentPlaceholder(
input_name='output1', then=['then'], else_=['something'])
])
]))
])
def test_to_from_placeholder(
self, placeholder_string: str,
placeholder_obj: placeholders.ConcatPlaceholder):
self.assertEqual(
placeholders.ConcatPlaceholder.from_placeholder_string(
placeholder_string), placeholder_obj)
self.assertEqual(placeholder_obj.to_placeholder_string(),
placeholder_string)
@parameterized.parameters([
"{{$.inputs.parameters[''input1'']}}+{{$.inputs.parameters[''input2'']}}",
'{"Concat": ["a", "b"]}', '{"Concat": ["a", {"Concat": ["b", "c"]}]}',
"{{$.inputs.parameters[''input1'']}}something{{$.inputs.parameters[''input2'']}}",
"{{$.inputs.parameters[''input_prefix'']}}some value",
"some value{{$.inputs.parameters[''input_suffix'']}}",
"some value{{$.inputs.parameters[''input_infix'']}}some value"
])
def test_is_match(self, placeholder: str):
self.assertTrue(placeholders.ConcatPlaceholder.is_match(placeholder))
@parameterized.parameters([
("{{$.inputs.parameters[''input1'']}}something{{$.inputs.parameters[''input2'']}}",
[
"{{$.inputs.parameters[''input1'']}}", 'something',
"{{$.inputs.parameters[''input2'']}}"
]),
("{{$.inputs.parameters[''input_prefix'']}}some value",
["{{$.inputs.parameters[''input_prefix'']}}", 'some value']),
("some value{{$.inputs.parameters[''input_suffix'']}}",
['some value', "{{$.inputs.parameters[''input_suffix'']}}"]),
("some value{{$.inputs.parameters[''input_infix'']}}some value", [
'some value', "{{$.inputs.parameters[''input_infix'']}}",
'some value'
]),
("{{$.inputs.parameters[''input1'']}}+{{$.inputs.parameters[''input2'']}}",
[
"{{$.inputs.parameters[''input1'']}}",
"{{$.inputs.parameters[''input2'']}}"
])
])
def test_split_cel_concat_string(self, placeholder: str,
expected: List[str]):
self.assertEqual(
placeholders.ConcatPlaceholder.split_cel_concat_string(placeholder),
expected)
class TestProcessCommandArg(unittest.TestCase):
def test_string(self):
arg = 'test'
struct = placeholders.maybe_convert_placeholder_string_to_placeholder(
arg)
self.assertEqual(struct, arg)
def test_input_value_placeholder(self):
arg = "{{$.inputs.parameters['input1']}}"
actual = placeholders.maybe_convert_placeholder_string_to_placeholder(
arg)
expected = placeholders.InputValuePlaceholder(input_name='input1')
self.assertEqual(actual, expected)
def test_input_path_placeholder(self):
arg = "{{$.inputs.artifacts['input1'].path}}"
actual = placeholders.maybe_convert_placeholder_string_to_placeholder(
arg)
expected = placeholders.InputPathPlaceholder('input1')
self.assertEqual(actual, expected)
def test_input_uri_placeholder(self):
arg = "{{$.inputs.artifacts['input1'].uri}}"
actual = placeholders.maybe_convert_placeholder_string_to_placeholder(
arg)
expected = placeholders.InputUriPlaceholder('input1')
self.assertEqual(actual, expected)
def test_output_path_placeholder(self):
arg = "{{$.outputs.artifacts['output1'].path}}"
actual = placeholders.maybe_convert_placeholder_string_to_placeholder(
arg)
expected = placeholders.OutputPathPlaceholder('output1')
self.assertEqual(actual, expected)
def test_output_uri_placeholder(self):
placeholder = "{{$.outputs.artifacts['output1'].uri}}"
actual = placeholders.maybe_convert_placeholder_string_to_placeholder(
placeholder)
expected = placeholders.OutputUriPlaceholder('output1')
self.assertEqual(actual, expected)
def test_output_parameter_placeholder(self):
placeholder = "{{$.outputs.parameters['output1'].output_file}}"
actual = placeholders.maybe_convert_placeholder_string_to_placeholder(
placeholder)
expected = placeholders.OutputParameterPlaceholder('output1')
self.assertEqual(actual, expected)
def test_concat_placeholder(self):
placeholder = "{{$.inputs.parameters[''input1'']}}+{{$.inputs.parameters[''input2'']}}"
actual = placeholders.maybe_convert_placeholder_string_to_placeholder(
placeholder)
expected = placeholders.ConcatPlaceholder(items=[
placeholders.InputValuePlaceholder(input_name='input1'),
placeholders.InputValuePlaceholder(input_name='input2')
])
self.assertEqual(actual, expected)

View File

@ -15,17 +15,16 @@
import ast
import collections
import dataclasses
import functools
import itertools
import re
from typing import Any, Dict, List, Mapping, Optional, Type, TypeVar, Union
from typing import Any, Dict, List, Mapping, Optional, Union
import uuid
import kfp
from kfp import dsl
from kfp.compiler import compiler
from kfp.components import base_model
from kfp.components import placeholders
from kfp.components import utils
from kfp.components import v1_components
from kfp.components import v1_structures
@ -160,221 +159,6 @@ class OutputSpec(base_model.BaseModel):
return False
P = TypeVar('P', bound='PlaceholderSerializationMixin')
class PlaceholderSerializationMixin:
"""Mixin for *Placeholder objects that handles the
serialization/deserialization of the placeholder."""
_FROM_PLACEHOLDER_REGEX: Union[str, type(NotImplemented)] = NotImplemented
_TO_PLACEHOLDER_TEMPLATE_STRING: Union[
str, type(NotImplemented)] = NotImplemented
@classmethod
def _is_input_placeholder(cls) -> bool:
"""Checks if the class is an InputPlaceholder (rather than an
OutputPlaceholder).
Returns:
bool: True if the class is an InputPlaceholder, False otherwise.
"""
field_names = {field.name for field in dataclasses.fields(cls)}
return "input_name" in field_names
@classmethod
def is_match(cls: Type[P], placeholder_string: str) -> bool:
"""Determines if the placeholder_string matches the placeholder
pattern.
Args:
placeholder_string (str): The string (often "{{$.inputs/outputs...}}") to check.
Returns:
bool: Determines if the placeholder_string matches the placeholder pattern.
"""
return re.match(cls._FROM_PLACEHOLDER_REGEX,
placeholder_string) is not None
@classmethod
def from_placeholder(cls: Type[P], placeholder_string: str) -> P:
"""Converts a placeholder string into a placeholder object.
Args:
placeholder_string (str): The placeholder.
Returns:
PlaceholderSerializationMixin subclass: The placeholder object.
"""
if cls._FROM_PLACEHOLDER_REGEX == NotImplemented:
raise NotImplementedError(
f'{cls.__name__} does not support placeholder parsing.')
matches = re.search(cls._FROM_PLACEHOLDER_REGEX, placeholder_string)
if matches is None:
raise ValueError(
f'Could not parse placeholder: {placeholder_string} into {cls.__name__}'
)
if cls._is_input_placeholder():
return cls(input_name=matches[1]) # type: ignore
else:
return cls(output_name=matches[1]) # type: ignore
def to_placeholder(self: P) -> str:
"""Converts a placeholder object into a placeholder string.
Returns:
str: The placeholder string.
"""
if self._TO_PLACEHOLDER_TEMPLATE_STRING == NotImplemented:
raise NotImplementedError(
f'{self.__class__.__name__} does not support creating placeholder strings.'
)
attr_name = 'input_name' if self._is_input_placeholder(
) else 'output_name'
value = getattr(self, attr_name)
return self._TO_PLACEHOLDER_TEMPLATE_STRING.format(value)
class InputValuePlaceholder(base_model.BaseModel,
PlaceholderSerializationMixin):
"""Class that holds input value for conditional cases.
Attributes:
input_name: name of the input.
"""
input_name: str
_aliases = {'input_name': 'inputValue'}
_TO_PLACEHOLDER_TEMPLATE_STRING = "{{{{$.inputs.parameters['{}']}}}}"
_FROM_PLACEHOLDER_REGEX = r"\{\{\$\.inputs\.parameters\[(?:''|'|\")(.+?)(?:''|'|\")]\}\}"
class InputPathPlaceholder(base_model.BaseModel, PlaceholderSerializationMixin):
"""Class that holds input path for conditional cases.
Attributes:
input_name: name of the input.
"""
input_name: str
_aliases = {'input_name': 'inputPath'}
_TO_PLACEHOLDER_TEMPLATE_STRING = "{{{{$.inputs.artifacts['{}'].path}}}}"
_FROM_PLACEHOLDER_REGEX = r"\{\{\$\.inputs\.artifacts\[(?:''|'|\")(.+?)(?:''|'|\")]\.path\}\}"
class InputUriPlaceholder(base_model.BaseModel, PlaceholderSerializationMixin):
"""Class that holds input uri for conditional cases.
Attributes:
input_name: name of the input.
"""
input_name: str
_aliases = {'input_name': 'inputUri'}
_TO_PLACEHOLDER_TEMPLATE_STRING = "{{{{$.inputs.artifacts['{}'].uri}}}}"
_FROM_PLACEHOLDER_REGEX = r"\{\{\$\.inputs\.artifacts\[(?:''|'|\")(.+?)(?:''|'|\")]\.uri\}\}"
class OutputParameterPlaceholder(base_model.BaseModel,
PlaceholderSerializationMixin):
"""Class that holds output path for conditional cases.
Attributes:
output_name: name of the output.
"""
output_name: str
_aliases = {'output_name': 'outputPath'}
_TO_PLACEHOLDER_TEMPLATE_STRING = "{{{{$.outputs.parameters['{}'].output_file}}}}"
_FROM_PLACEHOLDER_REGEX = r"\{\{\$\.outputs\.parameters\[(?:''|'|\")(.+?)(?:''|'|\")]\.output_file\}\}"
class OutputPathPlaceholder(base_model.BaseModel,
PlaceholderSerializationMixin):
"""Class that holds output path for conditional cases.
Attributes:
output_name: name of the output.
"""
output_name: str
_aliases = {'output_name': 'outputPath'}
_TO_PLACEHOLDER_TEMPLATE_STRING = "{{{{$.outputs.artifacts['{}'].path}}}}"
_FROM_PLACEHOLDER_REGEX = r"\{\{\$\.outputs\.artifacts\[(?:''|'|\")(.+?)(?:''|'|\")]\.path\}\}"
class OutputUriPlaceholder(base_model.BaseModel, PlaceholderSerializationMixin):
"""Class that holds output uri for conditional cases.
Attributes:
output_name: name of the output.
"""
output_name: str
_aliases = {'output_name': 'outputUri'}
_TO_PLACEHOLDER_TEMPLATE_STRING = "{{{{$.outputs.artifacts['{}'].uri}}}}"
_FROM_PLACEHOLDER_REGEX = r"\{\{\$\.outputs\.artifacts\[(?:''|'|\")(.+?)(?:''|'|\")]\.uri\}\}"
ValidCommandArgs = Union[str, InputValuePlaceholder, InputPathPlaceholder,
InputUriPlaceholder, OutputPathPlaceholder,
OutputUriPlaceholder, OutputParameterPlaceholder,
'IfPresentPlaceholder', 'ConcatPlaceholder']
class ConcatPlaceholder(base_model.BaseModel):
"""Class that extends basePlaceholders for concatenation.
Attributes:
items: string or ValidCommandArgs for concatenation.
"""
items: List[ValidCommandArgs]
_aliases = {'items': 'concat'}
@classmethod
def from_concat_string(cls, concat_string: str) -> 'ConcatPlaceholder':
"""Creates a concat placeholder from an IR string indicating
concatenation.
Args:
concat_string (str): The IR string (e.g. {{$.inputs.parameters['input1']}}+{{$.inputs.parameters['input2']}})
Returns:
ConcatPlaceholder: The ConcatPlaceholder instance.
"""
items = []
for a in concat_string.split('+'):
items.extend([maybe_convert_command_arg_to_placeholder(a), '+'])
del items[-1]
return ConcatPlaceholder(items=items)
class IfPresentPlaceholderStructure(base_model.BaseModel):
"""Class that holds structure for conditional cases.
Attributes:
input_name: name of the input/output.
then: If the input/output specified in name is present,
the command-line argument will be replaced at run-time by the
expanded value of then.
otherwise: If the input/output specified in name is not present,
the command-line argument will be replaced at run-time by the
expanded value of otherwise.
"""
input_name: str
then: List[ValidCommandArgs]
otherwise: Optional[List[ValidCommandArgs]] = None
_aliases = {'input_name': 'inputName', 'otherwise': 'else'}
def transform_otherwise(self) -> None:
"""Use None instead of empty list for optional."""
self.otherwise = None if self.otherwise == [] else self.otherwise
class IfPresentPlaceholder(base_model.BaseModel):
"""Class that extends basePlaceholders for conditional cases.
Attributes:
if_present (ifPresent): holds structure for conditional cases.
"""
if_structure: IfPresentPlaceholderStructure
_aliases = {'if_structure': 'ifPresent'}
class ResourceSpec(base_model.BaseModel):
"""The resource requirements of a container execution.
@ -402,9 +186,9 @@ class ContainerSpec(base_model.BaseModel):
resources (optional): the specification on the resource requirements.
"""
image: str
command: Optional[List[ValidCommandArgs]] = None
args: Optional[List[ValidCommandArgs]] = None
env: Optional[Mapping[str, ValidCommandArgs]] = None
command: Optional[List[placeholders.CommandLineElement]] = None
args: Optional[List[placeholders.CommandLineElement]] = None
env: Optional[Mapping[str, placeholders.CommandLineElement]] = None
resources: Optional[ResourceSpec] = None
def transform_command(self) -> None:
@ -432,16 +216,17 @@ class ContainerSpec(base_model.BaseModel):
Returns:
ContainerSpec: The ContainerSpec instance.
"""
args = container_dict.get('args')
if args is not None:
args = [
maybe_convert_command_arg_to_placeholder(arg) for arg in args
placeholders.maybe_convert_placeholder_string_to_placeholder(
arg) for arg in args
]
command = container_dict.get('command')
if command is not None:
command = [
maybe_convert_command_arg_to_placeholder(c) for c in command
placeholders.maybe_convert_placeholder_string_to_placeholder(c)
for c in command
]
return ContainerSpec(
image=container_dict['image'],
@ -451,40 +236,6 @@ class ContainerSpec(base_model.BaseModel):
resources=None) # can only be set on tasks
def maybe_convert_command_arg_to_placeholder(arg: str) -> ValidCommandArgs:
"""Infers if a command is a placeholder and converts it to the correct
Placeholder object.
Args:
arg (str): The arg or command to possibly convert.
Returns:
ValidCommandArgs: The converted command or original string.
"""
# short path to avoid checking all regexs
if not arg.startswith('{{$'):
return arg
# handle concat placeholder
# TODO: change when support for concat is added to IR
if '}}+{{' in arg:
return ConcatPlaceholder.from_concat_string(arg)
placeholders = {
InputValuePlaceholder,
InputPathPlaceholder,
InputUriPlaceholder,
OutputPathPlaceholder,
OutputUriPlaceholder,
OutputParameterPlaceholder,
}
for placeholder_struct in placeholders:
if placeholder_struct.is_match(arg):
return placeholder_struct.from_placeholder(arg)
return arg
class TaskSpec(base_model.BaseModel):
"""The spec of a pipeline task.
@ -598,66 +349,52 @@ def try_to_get_dict_from_string(string: str) -> Union[dict, str]:
def convert_str_or_dict_to_placeholder(
element: Union[str, dict,
ValidCommandArgs]) -> Union[str, ValidCommandArgs]:
element: Union[str,
dict]) -> Union[str, placeholders.CommandLineElement]:
"""Converts command and args elements to a placholder type based on value
of the key of the placeholder string, else returns the input.
Args:
element (Union[str, dict, ValidCommandArgs]): A ContainerSpec.command or ContainerSpec.args element.
Raises:
TypeError: If `element` is invalid.
element (Union[str, dict, placeholders.CommandLineElement]): A ContainerSpec.command or ContainerSpec.args element.
Returns:
Union[str, ValidCommandArgs]: Possibly converted placeholder or original input.
Union[str, placeholders.CommandLineElement]: Possibly converted placeholder or original input.
"""
if not isinstance(element, (dict, str)):
if not isinstance(element, dict):
return element
elif isinstance(element, str):
res = try_to_get_dict_from_string(element)
if not isinstance(res, dict):
return element
elif isinstance(element, dict):
res = element
else:
raise TypeError(
f'Invalid type for arg: {type(element)}. Expected str or dict.')
has_one_entry = len(res) == 1
has_one_entry = len(element) == 1
if not has_one_entry:
raise ValueError(
f'Got unexpected dictionary {res}. Expected a dictionary with one entry.'
f'Got unexpected dictionary {element}. Expected a dictionary with one entry.'
)
first_key = list(res.keys())[0]
first_value = list(res.values())[0]
first_key = list(element.keys())[0]
first_value = list(element.values())[0]
if first_key == 'inputValue':
return InputValuePlaceholder(
return placeholders.InputValuePlaceholder(
input_name=utils.sanitize_input_name(first_value))
elif first_key == 'inputPath':
return InputPathPlaceholder(
return placeholders.InputPathPlaceholder(
input_name=utils.sanitize_input_name(first_value))
elif first_key == 'inputUri':
return InputUriPlaceholder(
return placeholders.InputUriPlaceholder(
input_name=utils.sanitize_input_name(first_value))
elif first_key == 'outputPath':
return OutputPathPlaceholder(
return placeholders.OutputPathPlaceholder(
output_name=utils.sanitize_input_name(first_value))
elif first_key == 'outputUri':
return OutputUriPlaceholder(
return placeholders.OutputUriPlaceholder(
output_name=utils.sanitize_input_name(first_value))
elif first_key == 'ifPresent':
structure_kwargs = res['ifPresent']
structure_kwargs = element['ifPresent']
structure_kwargs['input_name'] = structure_kwargs.pop('inputName')
structure_kwargs['otherwise'] = structure_kwargs.pop('else')
structure_kwargs['then'] = [
@ -668,13 +405,11 @@ def convert_str_or_dict_to_placeholder(
convert_str_or_dict_to_placeholder(e)
for e in structure_kwargs['otherwise']
]
if_structure = IfPresentPlaceholderStructure(**structure_kwargs)
return IfPresentPlaceholder(if_structure=if_structure)
return placeholders.IfPresentPlaceholder(**structure_kwargs)
elif first_key == 'concat':
return ConcatPlaceholder(items=[
convert_str_or_dict_to_placeholder(e) for e in res['concat']
return placeholders.ConcatPlaceholder(items=[
convert_str_or_dict_to_placeholder(e) for e in element['concat']
])
else:
@ -683,9 +418,9 @@ def convert_str_or_dict_to_placeholder(
)
def _check_valid_placeholder_reference(valid_inputs: List[str],
valid_outputs: List[str],
placeholder: ValidCommandArgs) -> None:
def _check_valid_placeholder_reference(
valid_inputs: List[str], valid_outputs: List[str],
placeholder: placeholders.CommandLineElement) -> None:
"""Validates input/output placeholders refer to an existing input/output.
Args:
@ -701,25 +436,26 @@ def _check_valid_placeholder_reference(valid_inputs: List[str],
"""
if isinstance(
placeholder,
(InputValuePlaceholder, InputPathPlaceholder, InputUriPlaceholder)):
(placeholders.InputValuePlaceholder, placeholders.InputPathPlaceholder,
placeholders.InputUriPlaceholder)):
if placeholder.input_name not in valid_inputs:
raise ValueError(
f'Argument "{placeholder}" references non-existing input.')
elif isinstance(placeholder, (OutputParameterPlaceholder,
OutputPathPlaceholder, OutputUriPlaceholder)):
elif isinstance(placeholder, (placeholders.OutputParameterPlaceholder,
placeholders.OutputPathPlaceholder,
placeholders.OutputUriPlaceholder)):
if placeholder.output_name not in valid_outputs:
raise ValueError(
f'Argument "{placeholder}" references non-existing output.')
elif isinstance(placeholder, IfPresentPlaceholder):
if placeholder.if_structure.input_name not in valid_inputs:
elif isinstance(placeholder, placeholders.IfPresentPlaceholder):
if placeholder.input_name not in valid_inputs:
raise ValueError(
f'Argument "{placeholder}" references non-existing input.')
for placeholder in itertools.chain(
placeholder.if_structure.then or [],
placeholder.if_structure.otherwise or []):
for placeholder in itertools.chain(placeholder.then or [],
placeholder.else_ or []):
_check_valid_placeholder_reference(valid_inputs, valid_outputs,
placeholder)
elif isinstance(placeholder, ConcatPlaceholder):
elif isinstance(placeholder, placeholders.ConcatPlaceholder):
for placeholder in placeholder.items:
_check_valid_placeholder_reference(valid_inputs, valid_outputs,
placeholder)
@ -728,10 +464,13 @@ def _check_valid_placeholder_reference(valid_inputs: List[str],
f'Unexpected argument "{placeholder}" of type {type(placeholder)}.')
ValidCommandArgTypes = (str, InputValuePlaceholder, InputPathPlaceholder,
InputUriPlaceholder, OutputPathPlaceholder,
OutputUriPlaceholder, IfPresentPlaceholder,
ConcatPlaceholder)
ValidCommandArgTypes = (str, placeholders.InputValuePlaceholder,
placeholders.InputPathPlaceholder,
placeholders.InputUriPlaceholder,
placeholders.OutputPathPlaceholder,
placeholders.OutputUriPlaceholder,
placeholders.IfPresentPlaceholder,
placeholders.ConcatPlaceholder)
class ComponentSpec(base_model.BaseModel):
@ -811,7 +550,7 @@ class ComponentSpec(base_model.BaseModel):
Raises:
ValueError: If implementation is not found.
TypeError: if any argument is neither a str nor Dict.
TypeError: If any argument is neither a str nor Dict.
"""
component_dict = v1_component_spec.to_dict()
if component_dict.get('implementation') is None:
@ -819,40 +558,37 @@ class ComponentSpec(base_model.BaseModel):
if 'container' not in component_dict.get(
'implementation'): # type: ignore
raise NotImplementedError
raise NotImplementedError('Container implementation not found.')
def convert_v1_if_present_placholder_to_v2(
arg: Dict[str, Any]) -> Union[Dict[str, Any], ValidCommandArgs]:
arg: Dict[str, Any]
) -> Union[Dict[str, Any], placeholders.CommandLineElement]:
if isinstance(arg, str):
arg = try_to_get_dict_from_string(arg)
if not isinstance(arg, dict):
if isinstance(arg, str):
return arg
if 'if' in arg:
if_placeholder_values = arg['if']
if_placeholder_values_then = list(if_placeholder_values['then'])
try:
if_placeholder_values_else = list(
if_placeholder_values['else'])
except KeyError:
if_placeholder_values_else = []
return IfPresentPlaceholder(
if_structure=IfPresentPlaceholderStructure(
input_name=utils.sanitize_input_name(
if_placeholder_values['cond']['isPresent']),
then=[
convert_str_or_dict_to_placeholder(
convert_v1_if_present_placholder_to_v2(val))
for val in if_placeholder_values_then
],
otherwise=[
convert_str_or_dict_to_placeholder(
convert_v1_if_present_placholder_to_v2(val))
for val in if_placeholder_values_else
]))
if_ = arg['if']
input_name = utils.sanitize_input_name(if_['cond']['isPresent'])
then_ = if_['then']
else_ = if_.get('else', [])
return placeholders.IfPresentPlaceholder(
input_name=input_name,
then=[
convert_str_or_dict_to_placeholder(
convert_v1_if_present_placholder_to_v2(val))
for val in then_
],
else_=[
convert_str_or_dict_to_placeholder(
convert_v1_if_present_placholder_to_v2(val))
for val in else_
])
elif 'concat' in arg:
return ConcatPlaceholder(items=[
return placeholders.ConcatPlaceholder(items=[
convert_str_or_dict_to_placeholder(
convert_v1_if_present_placholder_to_v2(val))
for val in arg['concat']
@ -866,18 +602,17 @@ class ComponentSpec(base_model.BaseModel):
implementation = component_dict['implementation']['container']
implementation['command'] = [
convert_v1_if_present_placholder_to_v2(command)
for command in implementation.pop('command', [])
for command in implementation.get('command', [])
]
implementation['args'] = [
convert_v1_if_present_placholder_to_v2(command)
for command in implementation.pop('args', [])
for command in implementation.get('args', [])
]
implementation['env'] = {
key: convert_v1_if_present_placholder_to_v2(command)
for key, command in implementation.pop('env', {}).items()
for key, command in implementation.get('env', {}).items()
}
container_spec = ContainerSpec(image=implementation['image'])
# Must assign these after the constructor call, otherwise it won't work.
if implementation['command']:
container_spec.command = implementation['command']

View File

@ -21,6 +21,7 @@ import unittest
from absl.testing import parameterized
from kfp import compiler
from kfp.components import placeholders
from kfp.components import structures
V1_YAML_IF_PLACEHOLDER = textwrap.dedent("""\
@ -48,18 +49,17 @@ COMPONENT_SPEC_IF_PLACEHOLDER = structures.ComponentSpec(
container=structures.ContainerSpec(
image='alpine',
args=[
structures.IfPresentPlaceholder(
if_structure=structures.IfPresentPlaceholderStructure(
input_name='optional_input_1',
then=[
'--arg1',
structures.InputUriPlaceholder(
input_name='optional_input_1'),
],
otherwise=[
'--arg2',
'default',
]))
placeholders.IfPresentPlaceholder(
input_name='optional_input_1',
then=[
'--arg1',
placeholders.InputUriPlaceholder(
input_name='optional_input_1'),
],
else_=[
'--arg2',
'default',
])
])),
inputs={
'optional_input_1': structures.InputSpec(type='String', default=None)
@ -83,9 +83,10 @@ COMPONENT_SPEC_CONCAT_PLACEHOLDER = structures.ComponentSpec(
container=structures.ContainerSpec(
image='alpine',
args=[
structures.ConcatPlaceholder(items=[
placeholders.ConcatPlaceholder(items=[
'--arg1',
structures.InputValuePlaceholder(input_name='input_prefix'),
placeholders.InputValuePlaceholder(
input_name='input_prefix'),
])
])),
inputs={'input_prefix': structures.InputSpec(type='String')},
@ -121,25 +122,24 @@ COMPONENT_SPEC_NESTED_PLACEHOLDER = structures.ComponentSpec(
container=structures.ContainerSpec(
image='alpine',
args=[
structures.ConcatPlaceholder(items=[
placeholders.ConcatPlaceholder(items=[
'--arg1',
structures.IfPresentPlaceholder(
if_structure=structures.IfPresentPlaceholderStructure(
input_name='input_prefix',
then=[
placeholders.IfPresentPlaceholder(
input_name='input_prefix',
then=[
'--arg1',
placeholders.InputValuePlaceholder(
input_name='input_prefix'),
],
else_=[
'--arg2',
'default',
placeholders.ConcatPlaceholder(items=[
'--arg1',
structures.InputValuePlaceholder(
placeholders.InputValuePlaceholder(
input_name='input_prefix'),
],
otherwise=[
'--arg2',
'default',
structures.ConcatPlaceholder(items=[
'--arg1',
structures.InputValuePlaceholder(
input_name='input_prefix'),
]),
])),
]),
]),
])
])),
inputs={'input_prefix': structures.InputSpec(type='String')},
@ -163,9 +163,9 @@ class StructuresTest(parameterized.TestCase):
'sh',
'-c',
'set -ex\necho "$0" > "$1"',
structures.InputValuePlaceholder(
placeholders.InputValuePlaceholder(
input_name='input000'),
structures.OutputPathPlaceholder(
placeholders.OutputPathPlaceholder(
output_name='output1'),
],
)),
@ -186,9 +186,9 @@ class StructuresTest(parameterized.TestCase):
'sh',
'-c',
'set -ex\necho "$0" > "$1"',
structures.InputValuePlaceholder(
placeholders.InputValuePlaceholder(
input_name='input1'),
structures.OutputPathPlaceholder(
placeholders.OutputPathPlaceholder(
output_name='output000'),
],
)),
@ -207,8 +207,8 @@ class StructuresTest(parameterized.TestCase):
'sh',
'-c',
'set -ex\necho "$0" > "$1"',
structures.InputValuePlaceholder(input_name='input1'),
structures.OutputParameterPlaceholder(
placeholders.InputValuePlaceholder(input_name='input1'),
placeholders.OutputParameterPlaceholder(
output_name='output1'),
],
)),
@ -292,8 +292,8 @@ sdkVersion: kfp-2.0.0-alpha.2
'sh',
'-c',
'set -ex\necho "$0" > "$1"',
structures.InputValuePlaceholder(input_name='input1'),
structures.OutputParameterPlaceholder(
placeholders.InputValuePlaceholder(input_name='input1'),
placeholders.OutputParameterPlaceholder(
output_name='output1'),
],
)),
@ -362,13 +362,13 @@ sdkVersion: kfp-2.0.0-alpha.2
'echo "$0" > "$2" cp "$1" "$3" '),
],
args=[
structures.InputValuePlaceholder(
placeholders.InputValuePlaceholder(
input_name='input_parameter'),
structures.InputPathPlaceholder(
placeholders.InputPathPlaceholder(
input_name='input_artifact'),
structures.OutputPathPlaceholder(
placeholders.OutputPathPlaceholder(
output_name='output_1'),
structures.OutputPathPlaceholder(
placeholders.OutputPathPlaceholder(
output_name='output_2'),
],
env={},
@ -384,163 +384,6 @@ sdkVersion: kfp-2.0.0-alpha.2
self.assertEqual(generated_spec, expected_spec)
class TestInputValuePlaceholder(unittest.TestCase):
def test_to_placeholder(self):
structure = structures.InputValuePlaceholder('input1')
actual = structure.to_placeholder()
expected = "{{$.inputs.parameters['input1']}}"
self.assertEqual(
actual,
expected,
)
def test_from_placeholder_single_quote(self):
placeholder = "{{$.inputs.parameters['input1']}}"
expected = structures.InputValuePlaceholder('input1')
actual = structures.InputValuePlaceholder.from_placeholder(placeholder)
self.assertEqual(
actual,
expected,
)
def test_from_placeholder_double_single_quote(self):
placeholder = "{{$.inputs.parameters[''input1'']}}"
expected = structures.InputValuePlaceholder('input1')
actual = structures.InputValuePlaceholder.from_placeholder(placeholder)
self.assertEqual(
actual,
expected,
)
def test_from_placeholder_double_quote(self):
placeholder = '{{$.inputs.parameters["input1"]}}'
expected = structures.InputValuePlaceholder('input1')
actual = structures.InputValuePlaceholder.from_placeholder(placeholder)
self.assertEqual(
actual,
expected,
)
class TestInputPathPlaceholder(unittest.TestCase):
def test_to_placeholder(self):
structure = structures.InputPathPlaceholder('input1')
actual = structure.to_placeholder()
expected = "{{$.inputs.artifacts['input1'].path}}"
self.assertEqual(
actual,
expected,
)
def test_from_placeholder(self):
placeholder = "{{$.inputs.artifacts['input1'].path}}"
expected = structures.InputPathPlaceholder('input1')
actual = structures.InputPathPlaceholder.from_placeholder(placeholder)
self.assertEqual(
actual,
expected,
)
class TestInputUriPlaceholder(unittest.TestCase):
def test_to_placeholder(self):
structure = structures.InputUriPlaceholder('input1')
actual = structure.to_placeholder()
expected = "{{$.inputs.artifacts['input1'].uri}}"
self.assertEqual(
actual,
expected,
)
def test_from_placeholder(self):
placeholder = "{{$.inputs.artifacts['input1'].uri}}"
expected = structures.InputUriPlaceholder('input1')
actual = structures.InputUriPlaceholder.from_placeholder(placeholder)
self.assertEqual(
actual,
expected,
)
class TestOutputPathPlaceholder(unittest.TestCase):
def test_to_placeholder(self):
structure = structures.OutputPathPlaceholder('output1')
actual = structure.to_placeholder()
expected = "{{$.outputs.artifacts['output1'].path}}"
self.assertEqual(
actual,
expected,
)
def test_from_placeholder(self):
placeholder = "{{$.outputs.artifacts['output1'].path}}"
expected = structures.OutputPathPlaceholder('output1')
actual = structures.OutputPathPlaceholder.from_placeholder(placeholder)
self.assertEqual(
actual,
expected,
)
class TestOutputParameterPlaceholder(unittest.TestCase):
def test_to_placeholder(self):
structure = structures.OutputParameterPlaceholder('output1')
actual = structure.to_placeholder()
expected = "{{$.outputs.parameters['output1'].output_file}}"
self.assertEqual(
actual,
expected,
)
def test_from_placeholder(self):
placeholder = "{{$.outputs.parameters['output1'].output_file}}"
expected = structures.OutputParameterPlaceholder('output1')
actual = structures.OutputParameterPlaceholder.from_placeholder(
placeholder)
self.assertEqual(
actual,
expected,
)
class TestOutputUriPlaceholder(unittest.TestCase):
def test_to_placeholder(self):
structure = structures.OutputUriPlaceholder('output1')
actual = structure.to_placeholder()
expected = "{{$.outputs.artifacts['output1'].uri}}"
self.assertEqual(
actual,
expected,
)
def test_from_placeholder(self):
placeholder = "{{$.outputs.artifacts['output1'].uri}}"
expected = structures.OutputUriPlaceholder('output1')
actual = structures.OutputUriPlaceholder.from_placeholder(placeholder)
self.assertEqual(
actual,
expected,
)
class TestIfPresentPlaceholderStructure(unittest.TestCase):
def test_otherwise(self):
obj = structures.IfPresentPlaceholderStructure(
then='then', input_name='input_name', otherwise=['something'])
self.assertEqual(obj.otherwise, ['something'])
obj = structures.IfPresentPlaceholderStructure(
then='then', input_name='input_name', otherwise=[])
self.assertEqual(obj.otherwise, None)
class TestContainerSpec(unittest.TestCase):
def test_command_and_args(self):
@ -702,62 +545,6 @@ class TestOutputSpec(parameterized.TestCase):
self.assertEqual(output_spec.type, 'Artifact')
class TestProcessCommandArg(unittest.TestCase):
def test_string(self):
arg = 'test'
struct = structures.maybe_convert_command_arg_to_placeholder(arg)
self.assertEqual(struct, arg)
def test_input_value_placeholder(self):
arg = "{{$.inputs.parameters['input1']}}"
actual = structures.maybe_convert_command_arg_to_placeholder(arg)
expected = structures.InputValuePlaceholder(input_name='input1')
self.assertEqual(actual, expected)
def test_input_path_placeholder(self):
arg = "{{$.inputs.artifacts['input1'].path}}"
actual = structures.maybe_convert_command_arg_to_placeholder(arg)
expected = structures.InputPathPlaceholder('input1')
self.assertEqual(actual, expected)
def test_input_uri_placeholder(self):
arg = "{{$.inputs.artifacts['input1'].uri}}"
actual = structures.maybe_convert_command_arg_to_placeholder(arg)
expected = structures.InputUriPlaceholder('input1')
self.assertEqual(actual, expected)
def test_output_path_placeholder(self):
arg = "{{$.outputs.artifacts['output1'].path}}"
actual = structures.maybe_convert_command_arg_to_placeholder(arg)
expected = structures.OutputPathPlaceholder('output1')
self.assertEqual(actual, expected)
def test_output_uri_placeholder(self):
placeholder = "{{$.outputs.artifacts['output1'].uri}}"
actual = structures.maybe_convert_command_arg_to_placeholder(
placeholder)
expected = structures.OutputUriPlaceholder('output1')
self.assertEqual(actual, expected)
def test_output_parameter_placeholder(self):
placeholder = "{{$.outputs.parameters['output1'].output_file}}"
actual = structures.maybe_convert_command_arg_to_placeholder(
placeholder)
expected = structures.OutputParameterPlaceholder('output1')
self.assertEqual(actual, expected)
def test_concat_placeholder(self):
placeholder = "{{$.inputs.parameters[''input1'']}}+{{$.inputs.parameters[''input2'']}}"
actual = structures.maybe_convert_command_arg_to_placeholder(
placeholder)
expected = structures.ConcatPlaceholder(items=[
structures.InputValuePlaceholder(input_name='input1'), '+',
structures.InputValuePlaceholder(input_name='input2')
])
self.assertEqual(actual, expected)
V1_YAML = textwrap.dedent("""\
implementation:
container:
@ -974,8 +761,9 @@ sdkVersion: kfp-2.0.0-alpha.2""")
image='alpine',
command=['sh', '-c', 'echo "$0" >> "$1"'],
args=[
structures.InputValuePlaceholder(input_name='input1'),
structures.OutputPathPlaceholder(output_name='output1')
placeholders.InputValuePlaceholder(input_name='input1'),
placeholders.OutputPathPlaceholder(
output_name='output1')
],
env=None,
resources=None),
@ -1041,7 +829,7 @@ sdkVersion: kfp-2.0.0-alpha.2""")
command=['sh', '-c', 'echo "$0" "$1"'],
args=[
'input: ',
structures.InputValuePlaceholder(
placeholders.InputValuePlaceholder(
input_name='optional_input_1')
],
env=None,
@ -1112,10 +900,10 @@ sdkVersion: kfp-2.0.0-alpha.2""")
image='alpine',
command=[
'sh', '-c', 'echo "$0"',
structures.ConcatPlaceholder(items=[
structures.InputValuePlaceholder(
input_name='input1'), '+',
structures.InputValuePlaceholder(
placeholders.ConcatPlaceholder(items=[
placeholders.InputValuePlaceholder(
input_name='input1'),
placeholders.InputValuePlaceholder(
input_name='input2')
])
],