DSL refactor (#619)

* add comments

* relocate functions in compiler to aggregate similar functions; move _build_conventional_artifact as a nested function

* reduce sanitize functions into one in the dsl.

* more comments

* move all sanitization(op name, param name) from dsl to compiler

* sanitize pipelineparam name and op_name; remove format check in pipelineparam

* remove unit test for pipelineparam op_name format checking

* fix bug: correctly replace input in the argument list

* fix bug: replace arguments with found ones

* Sanitize the file_output keys, Matches the param in the args/cmds with the whole serialized param str, Verify both param name and container name

* loosen the containerop and param name restrictions
This commit is contained in:
Ning 2019-01-08 20:00:17 -08:00 committed by Kubernetes Prow Robot
parent 76f8b6b77b
commit d3c4add0a9
8 changed files with 180 additions and 146 deletions

View File

@ -22,6 +22,7 @@ import yaml
from datetime import datetime
from .compiler import compiler
from .compiler import _k8s_helper
class Client(object):
@ -171,7 +172,7 @@ class Client(object):
pipeline_obj = self._extract_pipeline_yaml(pipeline_package_path)
pipeline_json_string = json.dumps(pipeline_obj)
api_params = [kfp_run.ApiParameter(name=compiler.Compiler()._sanitize_name(k), value=str(v))
api_params = [kfp_run.ApiParameter(name=_k8s_helper.K8sHelper.sanitize_k8s_name(k), value=str(v))
for k,v in params.items()]
key = kfp_run.models.ApiResourceKey(id=experiment_id,
type=kfp_run.models.ApiResourceType.EXPERIMENT)

View File

@ -17,7 +17,7 @@ from kubernetes import client as k8s_client
from kubernetes import config
import time
import logging
import re
class K8sHelper(object):
""" Kubernetes Helper """
@ -119,6 +119,13 @@ class K8sHelper(object):
self._delete_k8s_job(pod_name, yaml_spec)
return succ
@staticmethod
def sanitize_k8s_name(name):
"""From _make_kubernetes_name
sanitize_k8s_name cleans and converts the names in the workflow.
"""
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')
@staticmethod
def convert_k8s_obj_to_json(k8s_obj):
"""

View File

@ -38,132 +38,16 @@ class Compiler(object):
```
"""
def _sanitize_name(self, name):
"""From _make_kubernetes_name
_sanitize_name cleans and converts the names in the workflow.
"""
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')
def _pipelineparam_full_name(self, param):
"""_pipelineparam_full_name
"""_pipelineparam_full_name converts the names of pipeline parameters
to unique names in the argo yaml
Args:
param(PipelineParam): pipeline parameter
"""
if param.op_name:
return param.op_name + '-' + param.name
return self._sanitize_name(param.name)
def _build_conventional_artifact(self, name):
return {
'name': name,
'path': '/' + name + '.json',
's3': {
# TODO: parameterize namespace for minio service
'endpoint': 'minio-service.kubeflow:9000',
'bucket': 'mlpipeline',
'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz',
'insecure': True,
'accessKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'accesskey',
},
'secretKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'secretkey'
}
},
}
def _process_args(self, raw_args, argument_inputs):
if not raw_args:
return []
processed_args = list(map(str, raw_args))
for i, _ in enumerate(processed_args):
for param in argument_inputs:
full_name = self._pipelineparam_full_name(param)
processed_args[i] = re.sub(
str(param), '{{inputs.parameters.%s}}' % full_name, str(processed_args[i]))
return processed_args
def _op_to_template(self, op):
"""Generate template given an operator inherited from dsl.ContainerOp."""
input_parameters = []
for param in op.inputs:
one_parameter = {'name': self._pipelineparam_full_name(param)}
if param.value:
one_parameter['value'] = str(param.value)
input_parameters.append(one_parameter)
# Sort to make the results deterministic.
input_parameters.sort(key=lambda x: x['name'])
output_parameters = []
for param in op.outputs.values():
output_parameters.append({
'name': self._pipelineparam_full_name(param),
'valueFrom': {'path': op.file_outputs[param.name]}
})
output_parameters.sort(key=lambda x: x['name'])
template = {
'name': op.name,
'container': {
'image': op.image,
}
}
processed_arguments = self._process_args(op.arguments, op.argument_inputs)
processed_command = self._process_args(op.command, op.argument_inputs)
if processed_arguments:
template['container']['args'] = processed_arguments
if processed_command:
template['container']['command'] = processed_command
if input_parameters:
template['inputs'] = {'parameters': input_parameters}
template['outputs'] = {}
if output_parameters:
template['outputs'] = {'parameters': output_parameters}
# Generate artifact for metadata output
# The motivation of appending the minio info in the yaml
# is to specify a unique path for the metadata.
# TODO: after argo addresses the issue that configures a unique path
# for the artifact output when default artifact repository is configured,
# this part needs to be updated to use the default artifact repository.
output_artifacts = []
output_artifacts.append(self._build_conventional_artifact('mlpipeline-ui-metadata'))
output_artifacts.append(self._build_conventional_artifact('mlpipeline-metrics'))
template['outputs']['artifacts'] = output_artifacts
# Set resources.
if op.resource_limits or op.resource_requests:
template['container']['resources'] = {}
if op.resource_limits:
template['container']['resources']['limits'] = op.resource_limits
if op.resource_requests:
template['container']['resources']['requests'] = op.resource_requests
# Set nodeSelector.
if op.node_selector:
template['nodeSelector'] = op.node_selector
if op.env_variables:
template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables))
if op.volume_mounts:
template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts))
if op.pod_annotations or op.pod_labels:
template['metadata'] = {}
if op.pod_annotations:
template['metadata']['annotations'] = op.pod_annotations
if op.pod_labels:
template['metadata']['labels'] = op.pod_labels
return template
return param.name
def _get_groups_for_ops(self, root_group):
"""Helper function to get belonging groups for each op.
@ -327,6 +211,126 @@ class Compiler(object):
else:
return str(value_or_reference)
def _process_args(self, raw_args, argument_inputs):
if not raw_args:
return []
processed_args = list(map(str, raw_args))
for i, _ in enumerate(processed_args):
# unsanitized_argument_inputs stores a dict: string of sanitized param -> string of unsanitized param
matches = []
match = re.findall(r'{{pipelineparam:op=([\w\s\_-]*);name=([\w\s\_-]+);value=(.*?)}}', str(processed_args[i]))
matches += match
unsanitized_argument_inputs = {}
for x in list(set(matches)):
sanitized_str = str(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(x[1]), K8sHelper.sanitize_k8s_name(x[0]), x[2]))
unsanitized_argument_inputs[sanitized_str] = str(dsl.PipelineParam(x[1], x[0], x[2]))
if argument_inputs:
for param in argument_inputs:
if str(param) in unsanitized_argument_inputs:
full_name = self._pipelineparam_full_name(param)
processed_args[i] = re.sub(unsanitized_argument_inputs[str(param)], '{{inputs.parameters.%s}}' % full_name,
processed_args[i])
return processed_args
def _op_to_template(self, op):
"""Generate template given an operator inherited from dsl.ContainerOp."""
def _build_conventional_artifact(name):
return {
'name': name,
'path': '/' + name + '.json',
's3': {
# TODO: parameterize namespace for minio service
'endpoint': 'minio-service.kubeflow:9000',
'bucket': 'mlpipeline',
'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz',
'insecure': True,
'accessKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'accesskey',
},
'secretKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'secretkey'
}
},
}
processed_arguments = self._process_args(op.arguments, op.argument_inputs)
processed_command = self._process_args(op.command, op.argument_inputs)
input_parameters = []
for param in op.inputs:
one_parameter = {'name': self._pipelineparam_full_name(param)}
if param.value:
one_parameter['value'] = str(param.value)
input_parameters.append(one_parameter)
# Sort to make the results deterministic.
input_parameters.sort(key=lambda x: x['name'])
output_parameters = []
for param in op.outputs.values():
output_parameters.append({
'name': self._pipelineparam_full_name(param),
'valueFrom': {'path': op.file_outputs[param.name]}
})
output_parameters.sort(key=lambda x: x['name'])
template = {
'name': op.name,
'container': {
'image': op.image,
}
}
if processed_arguments:
template['container']['args'] = processed_arguments
if processed_command:
template['container']['command'] = processed_command
if input_parameters:
template['inputs'] = {'parameters': input_parameters}
template['outputs'] = {}
if output_parameters:
template['outputs'] = {'parameters': output_parameters}
# Generate artifact for metadata output
# The motivation of appending the minio info in the yaml
# is to specify a unique path for the metadata.
# TODO: after argo addresses the issue that configures a unique path
# for the artifact output when default artifact repository is configured,
# this part needs to be updated to use the default artifact repository.
output_artifacts = []
output_artifacts.append(_build_conventional_artifact('mlpipeline-ui-metadata'))
output_artifacts.append(_build_conventional_artifact('mlpipeline-metrics'))
template['outputs']['artifacts'] = output_artifacts
# Set resources.
if op.resource_limits or op.resource_requests:
template['container']['resources'] = {}
if op.resource_limits:
template['container']['resources']['limits'] = op.resource_limits
if op.resource_requests:
template['container']['resources']['requests'] = op.resource_requests
# Set nodeSelector.
if op.node_selector:
template['nodeSelector'] = op.node_selector
if op.env_variables:
template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables))
if op.volume_mounts:
template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts))
if op.pod_annotations or op.pod_labels:
template['metadata'] = {}
if op.pod_annotations:
template['metadata']['annotations'] = op.pod_annotations
if op.pod_labels:
template['metadata']['labels'] = op.pod_labels
return template
def _group_to_template(self, group, inputs, outputs, dependencies):
"""Generate template given an OpsGroup.
@ -505,10 +509,10 @@ class Compiler(object):
raise ValueError('Please use a function with @dsl.pipeline decorator.')
pipeline_name, _ = dsl.Pipeline.get_pipeline_functions()[pipeline_func]
pipeline_name = self._sanitize_name(pipeline_name)
pipeline_name = K8sHelper.sanitize_k8s_name(pipeline_name)
# Create the arg list with no default values and call pipeline function.
args_list = [dsl.PipelineParam(self._sanitize_name(arg_name))
args_list = [dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name))
for arg_name in argspec.args]
with dsl.Pipeline(pipeline_name) as p:
pipeline_func(*args_list)
@ -517,12 +521,36 @@ class Compiler(object):
self._validate_exit_handler(p)
# Fill in the default values.
args_list_with_defaults = [dsl.PipelineParam(self._sanitize_name(arg_name))
args_list_with_defaults = [dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name))
for arg_name in argspec.args]
if argspec.defaults:
for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)):
arg.value = default.value if isinstance(default, dsl.PipelineParam) else default
# Sanitize operator names and param names
sanitized_ops = {}
for op in p.ops.values():
sanitized_name = K8sHelper.sanitize_k8s_name(op.name)
op.name = sanitized_name
for param in op.inputs + op.argument_inputs:
param.name = K8sHelper.sanitize_k8s_name(param.name)
if param.op_name:
param.op_name = K8sHelper.sanitize_k8s_name(param.op_name)
for param in op.outputs.values():
param.name = K8sHelper.sanitize_k8s_name(param.name)
if param.op_name:
param.op_name = K8sHelper.sanitize_k8s_name(param.op_name)
if op.output is not None:
op.output.name = K8sHelper.sanitize_k8s_name(op.output.name)
op.output.op_name = K8sHelper.sanitize_k8s_name(op.output.op_name)
if op.file_outputs is not None:
sanitized_file_outputs = {}
for key in op.file_outputs.keys():
sanitized_file_outputs[K8sHelper.sanitize_k8s_name(key)] = op.file_outputs[key]
op.file_outputs = sanitized_file_outputs
sanitized_ops[sanitized_name] = op
p.ops = sanitized_ops
workflow = self._create_pipeline_workflow(args_list_with_defaults, p)
return workflow

View File

@ -27,7 +27,8 @@ class ContainerOp(object):
"""Create a new instance of ContainerOp.
Args:
name: the name of the op. Has to be unique within a pipeline.
name: the name of the op. It does not have to be unique within a pipeline
because the pipeline will generates a unique new name in case of conflicts.
image: the container image name, such as 'python:3.5-jessie'
command: the command to run in the container.
If None, uses default CMD in defined in container.
@ -43,6 +44,10 @@ class ContainerOp(object):
if not _pipeline.Pipeline.get_default_pipeline():
raise ValueError('Default pipeline not defined.')
valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$'
if not re.match(valid_name_regex, name):
raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' % (name))
self.human_name = name
self.name = _pipeline.Pipeline.get_default_pipeline().add_op(self, is_exit_handler)
self.image = image
@ -60,7 +65,7 @@ class ContainerOp(object):
matches = []
for arg in (command or []) + (arguments or []):
match = re.findall(r'{{pipelineparam:op=([\w-]*);name=([\w-]+);value=(.*?)}}', str(arg))
match = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', str(arg))
matches += match
self.argument_inputs = [_pipeline_param.PipelineParam(x[1], x[0], x[2])

View File

@ -15,7 +15,6 @@
from . import _container_op
from . import _ops_group
import re
import sys
@ -38,9 +37,6 @@ def pipeline(name, description):
return _pipeline
def _make_kubernetes_name(name):
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')
class Pipeline():
"""A pipeline contains a list of operators.
@ -106,22 +102,24 @@ class Pipeline():
Args:
op: An operator of ContainerOp or its inherited type.
Returns
op_name: a unique op name.
"""
kubernetes_name = _make_kubernetes_name(op.human_name)
step_id = kubernetes_name
op_name = op.human_name
#If there is an existing op with this name then generate a new name.
if step_id in self.ops:
if op_name in self.ops:
for i in range(2, sys.maxsize**10):
step_id = kubernetes_name + '-' + str(i)
if step_id not in self.ops:
op_name = op_name + '-' + str(i)
if op_name not in self.ops:
break
self.ops[step_id] = op
self.ops[op_name] = op
if not define_only:
self.groups[-1].ops.append(op)
return step_id
return op_name
def push_ops_group(self, group: _ops_group.OpsGroup):
"""Push an OpsGroup into the stack.

View File

@ -44,12 +44,9 @@ class PipelineParam(object):
and value are set.
"""
valid_name_regex = r'^[A-Za-z][A-Za-z0-9-]*$'
valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$'
if not re.match(valid_name_regex, name):
raise ValueError('Only letters, numbers and "-" allowed in name. Must begin with letter.')
if op_name and not re.match(valid_name_regex, op_name):
raise ValueError('Only letters, numbers and "-" allowed in op_name. Must begin with letter.')
raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' % (name))
if op_name and value:
raise ValueError('op_name and value cannot be both set.')

View File

@ -14,6 +14,7 @@
import unittest
import sys
import pipeline_tests
import pipeline_param_tests

View File

@ -24,9 +24,6 @@ class TestPipelineParam(unittest.TestCase):
with self.assertRaises(ValueError):
p = PipelineParam(name='123_abc')
with self.assertRaises(ValueError):
p = PipelineParam(name='param1', op_name='a b')
def test_str_repr(self):
"""Test string representation."""