SDK/Components - Simplified _create_task_factory_from_component_spec function (#662)

This commit is contained in:
Alexey Volkov 2019-01-14 18:02:16 -08:00 committed by Kubernetes Prow Robot
parent 6cce1bc796
commit fd282d67cd
2 changed files with 154 additions and 138 deletions

View File

@ -186,156 +186,50 @@ def _make_name_unique_by_adding_index(name:str, collection, delimiter:str):
return unique_name
#Holds the transformation functions that are called each time TaskSpec instance is created from a component. If there are multiple handlers, the last one is used.
_created_task_transformation_handler = []
#TODO: Move to the dsl.Pipeline context class
from . import _dsl_bridge
_created_task_transformation_handler.append(_dsl_bridge.create_container_op_from_task)
#TODO: Refactor the function to make it shorter
def _create_task_factory_from_component_spec(component_spec:ComponentSpec, component_filename=None):
def _create_task_factory_from_component_spec(component_spec:ComponentSpec, component_filename=None, component_ref: ComponentReference = None):
name = component_spec.name or _default_component_name
description = component_spec.description
inputs_list = component_spec.inputs or [] #List[InputSpec]
outputs_list = component_spec.outputs or [] #List[OutputSpec]
inputs_dict = {port.name: port for port in inputs_list}
#Creating the name translation tables : Original <-> Pythonic
input_name_to_pythonic = {}
pythonic_name_to_input_name = {}
input_name_to_kubernetes = {}
output_name_to_kubernetes = {}
kubernetes_name_to_input_name = {}
kubernetes_name_to_output_name = {}
for io_port in inputs_list:
pythonic_name = _sanitize_python_function_name(io_port.name)
pythonic_name = _make_name_unique_by_adding_index(pythonic_name, pythonic_name_to_input_name, '_')
input_name_to_pythonic[io_port.name] = pythonic_name
pythonic_name_to_input_name[pythonic_name] = io_port.name
kubernetes_name = _sanitize_kubernetes_resource_name(io_port.name)
kubernetes_name = _make_name_unique_by_adding_index(kubernetes_name, kubernetes_name_to_input_name, '-')
input_name_to_kubernetes[io_port.name] = kubernetes_name
kubernetes_name_to_input_name[kubernetes_name] = io_port.name
if component_ref is None:
component_ref = ComponentReference(name=component_spec.name or component_filename or _default_component_name)
component_ref._component_spec = component_spec
for io_port in outputs_list:
kubernetes_name = _sanitize_kubernetes_resource_name(io_port.name)
kubernetes_name = _make_name_unique_by_adding_index(kubernetes_name, kubernetes_name_to_output_name, '-')
output_name_to_kubernetes[io_port.name] = kubernetes_name
kubernetes_name_to_output_name[kubernetes_name] = io_port.name
container_spec = component_spec.implementation.container
container_image = container_spec.image
file_outputs_from_def = OrderedDict()
if container_spec.file_outputs != None:
for param, path in container_spec.file_outputs.items():
output_key = output_name_to_kubernetes[param]
file_outputs_from_def[output_key] = path
def create_container_op_with_expanded_arguments(pythonic_input_argument_values):
file_outputs = file_outputs_from_def.copy()
def expand_command_part(arg): #input values with original names
#(Union[str,Mapping[str, Any]]) -> Union[str,List[str]]
if arg is None:
return None
if isinstance(arg, (str, int, float, bool)):
return str(arg)
if isinstance(arg, InputValuePlaceholder):
port_name = arg.input_name
input_value = pythonic_input_argument_values[input_name_to_pythonic[port_name]]
if input_value is not None:
return str(input_value)
else:
input_spec = inputs_dict[port_name]
if input_spec.optional:
#Even when we support default values there is no need to check for a default here.
#In current execution flow (called by python task factory), the missing argument would be replaced with the default value by python itself.
return None
else:
raise ValueError('No value provided for input {}'.format(port_name))
if isinstance(arg, InputPathPlaceholder):
port_name = arg.input_name
input_filename = _generate_input_file_name(port_name)
input_key = input_name_to_kubernetes[port_name]
input_value = pythonic_input_argument_values[input_name_to_pythonic[port_name]]
if input_value is not None:
return input_filename
else:
input_spec = inputs_dict[port_name]
if input_spec.optional:
#Even when we support default values there is no need to check for a default here.
#In current execution flow (called by python task factory), the missing argument would be replaced with the default value by python itself.
return None
else:
raise ValueError('No value provided for input {}'.format(port_name))
elif isinstance(arg, OutputPathPlaceholder):
port_name = arg.output_name
output_filename = _generate_output_file_name(port_name)
output_key = output_name_to_kubernetes[port_name]
if output_key in file_outputs:
if file_outputs[output_key] != output_filename:
raise ValueError('Conflicting output files specified for port {}: {} and {}'.format(port_name, file_outputs[output_key], output_filename))
else:
file_outputs[output_key] = output_filename
return output_filename
elif isinstance(arg, ConcatPlaceholder):
expanded_argument_strings = expand_argument_list(arg.items)
return ''.join(expanded_argument_strings)
elif isinstance(arg, IfPlaceholder):
arg = arg.if_structure
condition_result = expand_command_part(arg.condition)
from distutils.util import strtobool
condition_result_bool = condition_result and strtobool(condition_result) #Python gotcha: bool('False') == True; Need to use strtobool; Also need to handle None and []
result_node = arg.then_value if condition_result_bool else arg.else_value
if result_node is None:
return []
if isinstance(result_node, list):
expanded_result = expand_argument_list(result_node)
else:
expanded_result = expand_command_part(result_node)
return expanded_result
elif isinstance(arg, IsPresentPlaceholder):
pythonic_input_name = input_name_to_pythonic[arg.input_name]
argument_is_present = pythonic_input_argument_values[pythonic_input_name] is not None
return str(argument_is_present)
else:
raise TypeError('Unrecognized argument type: {}'.format(arg))
def expand_argument_list(argument_list):
expanded_list = []
if argument_list is not None:
for part in argument_list:
expanded_part = expand_command_part(part)
if expanded_part is not None:
if isinstance(expanded_part, list):
expanded_list.extend(expanded_part)
else:
expanded_list.append(str(expanded_part))
return expanded_list
expanded_command = expand_argument_list(container_spec.command)
expanded_args = expand_argument_list(container_spec.args)
#Working around Python's variable scoping. Do not write to variable from global scope as that makes the variable local.
file_outputs_to_pass = file_outputs
if file_outputs_to_pass == {}:
file_outputs_to_pass = None
from . import _dsl_bridge
return _dsl_bridge._task_object_factory(
name=name,
container_image=container_image,
command=expanded_command,
arguments=expanded_args,
file_outputs=file_outputs_to_pass,
def create_task_from_component_and_arguments(pythonic_arguments):
#Converting the argument names and not passing None arguments
valid_argument_types = (str, int, float, bool, GraphInputArgument, TaskOutputArgument) #Hack for passed PipelineParams. TODO: Remove the hack once they're no longer passed here.
arguments = {
pythonic_name_to_input_name[k]: (v if isinstance(v, valid_argument_types) else str(v))
for k, v in pythonic_arguments.items()
if v is not None
}
task = TaskSpec(
component_ref=component_ref,
arguments=arguments,
)
if _created_task_transformation_handler:
task = _created_task_transformation_handler[-1](task)
return task
import inspect
from . import _dynamic
@ -346,7 +240,7 @@ def _create_task_factory_from_component_spec(component_spec:ComponentSpec, compo
factory_function_parameters = input_parameters #Outputs are no longer part of the task factory function signature. The paths are always generated by the system.
return _dynamic.create_function_from_parameters(
create_container_op_with_expanded_arguments,
create_task_from_component_and_arguments,
factory_function_parameters,
documentation=description,
func_name=name,

View File

@ -12,9 +12,120 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
from ._structures import ConcatPlaceholder, IfPlaceholder, InputValuePlaceholder, InputPathPlaceholder, IsPresentPlaceholder, OutputPathPlaceholder, TaskSpec
from ._components import _generate_output_file_name, _default_component_name
def create_container_op_from_task(task_spec: TaskSpec):
argument_values = task_spec.arguments
component_spec = task_spec.component_ref._component_spec
inputs_dict = {input_spec.name: input_spec for input_spec in component_spec.inputs or []}
container_spec = component_spec.implementation.container
output_paths = OrderedDict() #Preserving the order to make the kubernetes output names deterministic
unconfigurable_output_paths = container_spec.file_outputs or {}
for output in component_spec.outputs or []:
if output.name in unconfigurable_output_paths:
output_paths[output.name] = unconfigurable_output_paths[output.name]
def expand_command_part(arg): #input values with original names
#(Union[str,Mapping[str, Any]]) -> Union[str,List[str]]
if arg is None:
return None
if isinstance(arg, (str, int, float, bool)):
return str(arg)
if isinstance(arg, InputValuePlaceholder):
input_name = arg.input_name
input_value = argument_values.get(input_name, None)
if input_value is not None:
return str(input_value)
else:
input_spec = inputs_dict[input_name]
if input_spec.optional:
return None
else:
raise ValueError('No value provided for input {}'.format(input_name))
if isinstance(arg, InputPathPlaceholder):
input_name = arg.input_name
input_value = argument_values.get(input_name, None)
if input_value is not None:
raise ValueError('ContainerOp does not support input artifacts - input {}'.format(input_name))
#return input_value
else:
input_spec = inputs_dict[input_name]
if input_spec.optional:
#Even when we support default values there is no need to check for a default here.
#In current execution flow (called by python task factory), the missing argument would be replaced with the default value by python itself.
return None
else:
raise ValueError('No value provided for input {}'.format(input_name))
elif isinstance(arg, OutputPathPlaceholder):
output_name = arg.output_name
output_filename = _generate_output_file_name(output_name)
if arg.output_name in output_paths:
if output_paths[output_name] != output_filename:
raise ValueError('Conflicting output files specified for port {}: {} and {}'.format(output_name, output_paths[output_name], output_filename))
else:
output_paths[output_name] = output_filename
return output_filename
elif isinstance(arg, ConcatPlaceholder):
expanded_argument_strings = expand_argument_list(arg.items)
return ''.join(expanded_argument_strings)
elif isinstance(arg, IfPlaceholder):
arg = arg.if_structure
condition_result = expand_command_part(arg.condition)
from distutils.util import strtobool
condition_result_bool = condition_result and strtobool(condition_result) #Python gotcha: bool('False') == True; Need to use strtobool; Also need to handle None and []
result_node = arg.then_value if condition_result_bool else arg.else_value
if result_node is None:
return []
if isinstance(result_node, list):
expanded_result = expand_argument_list(result_node)
else:
expanded_result = expand_command_part(result_node)
return expanded_result
elif isinstance(arg, IsPresentPlaceholder):
argument_is_present = argument_values.get(arg.input_name, None) is not None
return str(argument_is_present)
else:
raise TypeError('Unrecognized argument type: {}'.format(arg))
def expand_argument_list(argument_list):
expanded_list = []
if argument_list is not None:
for part in argument_list:
expanded_part = expand_command_part(part)
if expanded_part is not None:
if isinstance(expanded_part, list):
expanded_list.extend(expanded_part)
else:
expanded_list.append(str(expanded_part))
return expanded_list
expanded_command = expand_argument_list(container_spec.command)
expanded_args = expand_argument_list(container_spec.args)
return _task_object_factory(
name=component_spec.name or _default_component_name,
container_image=container_spec.image,
command=expanded_command,
arguments=expanded_args,
output_paths=output_paths,
)
_dummy_pipeline=None
def _create_task_object(name:str, container_image:str, command=None, arguments=None, file_outputs=None):
def _create_container_op_from_resolved_task(name:str, container_image:str, command=None, arguments=None, output_paths=None):
from .. import dsl
global _dummy_pipeline
need_dummy = dsl.Pipeline._default_pipeline is None
@ -23,12 +134,23 @@ def _create_task_object(name:str, container_image:str, command=None, arguments=N
_dummy_pipeline = dsl.Pipeline('dummy pipeline')
_dummy_pipeline.__enter__()
from ._components import _sanitize_kubernetes_resource_name, _make_name_unique_by_adding_index
output_name_to_kubernetes = {}
kubernetes_name_to_output_name = {}
for output_name in (output_paths or {}).keys():
kubernetes_name = _sanitize_kubernetes_resource_name(output_name)
kubernetes_name = _make_name_unique_by_adding_index(kubernetes_name, kubernetes_name_to_output_name, '-')
output_name_to_kubernetes[output_name] = kubernetes_name
kubernetes_name_to_output_name[kubernetes_name] = output_name
output_paths_for_container_op = {output_name_to_kubernetes[name]: path for name, path in output_paths.items()}
task = dsl.ContainerOp(
name=name,
image=container_image,
command=command,
arguments=arguments,
file_outputs=file_outputs,
file_outputs=output_paths_for_container_op,
)
if need_dummy:
@ -37,4 +159,4 @@ def _create_task_object(name:str, container_image:str, command=None, arguments=N
return task
_task_object_factory=_create_task_object
_task_object_factory=_create_container_op_from_resolved_task