pipelines/sdk/python/kfp/local/placeholder_utils.py

356 lines
13 KiB
Python

# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for working with placeholders."""
import functools
import json
import random
import re
from typing import Any, Dict, List, Optional, Union
from kfp import dsl
def make_random_id() -> str:
"""Makes a random 8 digit integer as a string."""
return str(random.randint(0, 99999999))
def replace_placeholders(
full_command: List[str],
executor_input_dict: Dict[str, Any],
pipeline_resource_name: str,
task_resource_name: str,
pipeline_root: str,
unique_pipeline_id: str,
) -> List[str]:
"""Iterates over each element in the command and replaces placeholders.
This should only be called once per each task, since the task's
random ID is created within the scope of the function. Multiple
calls on the same task will result in multiple random IDs per single
task.
"""
unique_task_id = make_random_id()
executor_input_dict = resolve_self_references_in_executor_input(
executor_input_dict=executor_input_dict,
pipeline_resource_name=pipeline_resource_name,
task_resource_name=task_resource_name,
pipeline_root=pipeline_root,
pipeline_job_id=unique_pipeline_id,
pipeline_task_id=unique_task_id,
)
provided_inputs = get_provided_inputs(executor_input_dict)
full_command = [
resolve_struct_placeholders(
placeholder,
provided_inputs,
) for placeholder in full_command
]
full_command = flatten_list(full_command)
resolved_command = []
for el in full_command:
resolved_el = resolve_individual_placeholder(
element=el,
executor_input_dict=executor_input_dict,
pipeline_resource_name=pipeline_resource_name,
task_resource_name=task_resource_name,
pipeline_root=pipeline_root,
pipeline_job_id=unique_pipeline_id,
pipeline_task_id=unique_task_id,
)
if resolved_el is None:
continue
elif isinstance(resolved_el, str):
resolved_command.append(resolved_el)
elif isinstance(resolved_el, list):
resolved_command.extend(resolved_el)
else:
raise ValueError(
f'Got unknown command element {resolved_el} of type {type(resolved_el)}.'
)
return resolved_command
def resolve_self_references_in_executor_input(
executor_input_dict: Dict[str, Any],
pipeline_resource_name: str,
task_resource_name: str,
pipeline_root: str,
pipeline_job_id: str,
pipeline_task_id: str,
) -> Dict[str, Any]:
"""Resolve parameter placeholders that point to other parameter
placeholders in the same ExecutorInput message.
This occurs when passing f-strings to a component. For example:
my_comp(foo=f'bar-{upstream.output}')
May result in the ExecutorInput message:
{'inputs': {'parameterValues': {'pipelinechannel--identity-Output': 'foo',
'string': "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar"}},
'outputs': ...}
The placeholder "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar" points to parameter 'pipelinechannel--identity-Output' with the value 'foo'. This function replaces "{{$.inputs.parameters['pipelinechannel--identity-Output']}}-bar" with 'foo'.
"""
for k, v in executor_input_dict.get('inputs',
{}).get('parameterValues', {}).items():
if isinstance(v, str):
executor_input_dict['inputs']['parameterValues'][
k] = resolve_individual_placeholder(
v,
executor_input_dict=executor_input_dict,
pipeline_resource_name=pipeline_resource_name,
task_resource_name=task_resource_name,
pipeline_root=pipeline_root,
pipeline_job_id=pipeline_job_id,
pipeline_task_id=pipeline_task_id,
)
return executor_input_dict
def recursively_resolve_json_dict_placeholders(
obj: Any,
executor_input_dict: Dict[str, Any],
pipeline_resource_name: str,
task_resource_name: str,
pipeline_root: str,
pipeline_job_id: str,
pipeline_task_id: str,
) -> Any:
"""Recursively resolves any placeholders in a dictionary representation of
a JSON object.
These objects are very unlikely to be sufficiently large to exceed
max recursion depth of 1000 and an iterative implementation is much
less readable, so preferring recursive implementation.
"""
inner_fn = functools.partial(
recursively_resolve_json_dict_placeholders,
executor_input_dict=executor_input_dict,
pipeline_resource_name=pipeline_resource_name,
task_resource_name=task_resource_name,
pipeline_root=pipeline_root,
pipeline_job_id=pipeline_job_id,
pipeline_task_id=pipeline_task_id,
)
if isinstance(obj, list):
return [inner_fn(item) for item in obj]
elif isinstance(obj, dict):
return {inner_fn(key): inner_fn(value) for key, value in obj.items()}
elif isinstance(obj, str):
return resolve_individual_placeholder(
element=obj,
executor_input_dict=executor_input_dict,
pipeline_resource_name=pipeline_resource_name,
task_resource_name=task_resource_name,
pipeline_root=pipeline_root,
pipeline_job_id=pipeline_job_id,
pipeline_task_id=pipeline_task_id,
)
else:
return obj
def flatten_list(l: List[Union[str, list, None]]) -> List[str]:
"""Iteratively flattens arbitrarily deeply nested lists, filtering out
elements that are None."""
result = []
stack = l.copy()
while stack:
element = stack.pop(0)
if isinstance(element, list):
stack = element + stack
elif element is not None:
result.append(element)
return result
def get_provided_inputs(executor_input_dict: Dict[str, Any]) -> Dict[str, Any]:
params = executor_input_dict.get('inputs', {}).get('parameterValues', {})
pkeys = [k for k, v in params.items() if v is not None]
artifacts = executor_input_dict.get('inputs', {}).get('artifacts', {})
akeys = [k for k, v in artifacts.items() if v is not None]
return pkeys + akeys
def get_value_using_path(
dictionary: Dict[str, Any],
path: List[str],
) -> Optional[Any]:
list_or_dict = dictionary
if not path:
raise ValueError('path cannot be empty.')
try:
for p in path:
list_or_dict = list_or_dict[p]
return list_or_dict
except KeyError:
return None
def convert_placeholder_parts_to_path(parts: List[str]) -> List[str]:
# if inputs, parameters --> parameterValues
if parts[0] == 'inputs' and parts[1] == 'parameters':
parts[1] = 'parameterValues'
# if outputs, parameter output_file --> outputFile
if parts[0] == 'outputs' and parts[1] == 'parameters' and parts[
3] == 'output_file':
parts[3] = 'outputFile'
# if artifacts...
if parts[1] == 'artifacts':
# ...need to get nested artifact object...
parts.insert(3, 'artifacts')
# ...and first entry in list with index 0
parts.insert(4, 0)
# for local, path is the uri
if parts[5] == 'path':
parts[5] = 'uri'
return parts
def resolve_io_placeholders(
executor_input: Dict[str, Any],
command: str,
) -> str:
"""Resolves placeholders in command using executor_input.
executor_input should not contain any unresolved placeholders.
"""
placeholders = re.findall(r'\{\{\$\.(.*?)\}\}', command)
# e.g., placeholder = "inputs.parameters[''text'']"
for placeholder in placeholders:
if 'json_escape' in placeholder:
raise ValueError('JSON escape placeholders are not supported.')
# e.g., parts = ['inputs', 'parameters', '', 'text', '', '']
parts = re.split(r'\.|\[|\]|\'\'|\'', placeholder)
# e.g., nonempty_parts = ['inputs', 'parameters', 'text']
nonempty_parts = [part for part in parts if part]
# e.g., path = ['inputs', 'parameterValues', 'text']
path = convert_placeholder_parts_to_path(nonempty_parts)
# e.g., path = ['inputs', 'parameterValues', 'text']
value = get_value_using_path(executor_input, path)
if not isinstance(value, str):
# even if value is None, should json.dumps to null
# and still resolve placeholder
value = json.dumps(value)
command = command.replace('{{$.' + placeholder + '}}', value)
return command
def resolve_struct_placeholders(
placeholder: str,
provided_inputs: List[str],
) -> List[Any]:
"""Resolves IfPresent and Concat placeholders to an arbitrarily deeply
nested list of strings, which may contain None."""
# throughout, filter out None for the case where IfPresent False and no else
def filter_none(l: List[Any]) -> List[Any]:
return [e for e in l if e is not None]
def recursively_resolve_struct(placeholder: Dict[str, Any]) -> str:
if isinstance(placeholder, str):
return placeholder
elif isinstance(placeholder, list):
raise ValueError(
f"You have an incorrectly nested {dsl.IfPresentPlaceholder!r} with a list provided for 'then' or 'else'."
)
first_key = list(placeholder.keys())[0]
if first_key == 'Concat':
concat = [
recursively_resolve_struct(p) for p in placeholder['Concat']
]
return ''.join(filter_none(concat))
elif first_key == 'IfPresent':
inner_struct = placeholder['IfPresent']
if inner_struct['InputName'] in provided_inputs:
then = inner_struct['Then']
if isinstance(then, str):
return then
elif isinstance(then, list):
return filter_none(
[recursively_resolve_struct(p) for p in then])
elif isinstance(then, dict):
return recursively_resolve_struct(then)
else:
else_ = inner_struct.get('Else')
if else_ is None:
return else_
if isinstance(else_, str):
return else_
elif isinstance(else_, list):
return filter_none(
[recursively_resolve_struct(p) for p in else_])
elif isinstance(else_, dict):
return recursively_resolve_struct(else_)
else:
raise ValueError
if placeholder.startswith('{"Concat": ') or placeholder.startswith(
'{"IfPresent": '):
des_placeholder = json.loads(placeholder)
return recursively_resolve_struct(des_placeholder)
else:
return placeholder
def resolve_individual_placeholder(
element: str,
executor_input_dict: Dict[str, Any],
pipeline_resource_name: str,
task_resource_name: str,
pipeline_root: str,
pipeline_job_id: str,
pipeline_task_id: str,
) -> str:
"""Replaces placeholders for a single element."""
# match on literal for constant placeholders
PLACEHOLDERS = {
r'{{$.outputs.output_file}}':
executor_input_dict['outputs']['outputFile'],
r'{{$.outputMetadataUri}}':
executor_input_dict['outputs']['outputFile'],
r'{{$}}':
json.dumps(executor_input_dict),
dsl.PIPELINE_JOB_NAME_PLACEHOLDER:
pipeline_resource_name,
dsl.PIPELINE_JOB_ID_PLACEHOLDER:
pipeline_job_id,
dsl.PIPELINE_TASK_NAME_PLACEHOLDER:
task_resource_name,
dsl.PIPELINE_TASK_ID_PLACEHOLDER:
pipeline_task_id,
dsl.PIPELINE_ROOT_PLACEHOLDER:
pipeline_root,
}
for placeholder, value in PLACEHOLDERS.items():
element = element.replace(placeholder, value)
# match non-constant placeholders (i.e., have key(s))
return resolve_io_placeholders(executor_input_dict, element)