SDK/DSL/Compiler - Fixed compilation of dsl.Condition (#28)
* Fixed compilation of dsl.Conditional The compiler no longer produced intermediate steps. * Got rid of _create_new_groups * Changed the sub_group.type check * Fix tfx name bug in the tfma sample test (#67) * fix tfx name bug * update release build for the data publish
This commit is contained in:
parent
2e3b328468
commit
98e4d2f881
|
|
@ -289,14 +289,17 @@ class Compiler(object):
|
||||||
dependencies[downstream_groups[0]].add(upstream_groups[0])
|
dependencies[downstream_groups[0]].add(upstream_groups[0])
|
||||||
return dependencies
|
return dependencies
|
||||||
|
|
||||||
def _create_condition(self, condition):
|
def _resolve_value_or_reference(self, value_or_reference, inputs):
|
||||||
left = ('{{inputs.parameters.%s}}' % self._param_full_name(condition.operand1)
|
if isinstance(value_or_reference, dsl.PipelineParam):
|
||||||
if isinstance(condition.operand1, dsl.PipelineParam)
|
parameter_name = self._param_full_name(value_or_reference)
|
||||||
else str(condition.operand1))
|
task_names = [task_name for param_name, task_name in inputs if param_name == parameter_name]
|
||||||
right = ('{{inputs.parameters.%s}}' % self._param_full_name(condition.operand2)
|
if task_names:
|
||||||
if isinstance(condition.operand2, dsl.PipelineParam)
|
task_name = task_names[0]
|
||||||
else str(condition.operand2))
|
return '{{tasks.%s.outputs.parameters.%s}}' % (task_name, parameter_name)
|
||||||
return ('%s == %s' % (left, right))
|
else:
|
||||||
|
return '{{inputs.parameters.%s}}' % parameter_name
|
||||||
|
else:
|
||||||
|
return str(value_or_reference)
|
||||||
|
|
||||||
def _group_to_template(self, group, inputs, outputs, dependencies):
|
def _group_to_template(self, group, inputs, outputs, dependencies):
|
||||||
"""Generate template given an OpsGroup.
|
"""Generate template given an OpsGroup.
|
||||||
|
|
@ -326,89 +329,56 @@ class Compiler(object):
|
||||||
template_outputs.sort(key=lambda x: x['name'])
|
template_outputs.sort(key=lambda x: x['name'])
|
||||||
template['outputs'] = {'parameters': template_outputs}
|
template['outputs'] = {'parameters': template_outputs}
|
||||||
|
|
||||||
if group.type == 'condition':
|
# Generate tasks section.
|
||||||
# This is a workaround for the fact that argo does not support conditions in DAG mode.
|
tasks = []
|
||||||
# Basically, we insert an extra group that contains only the original group. The extra group
|
for sub_group in group.groups + group.ops:
|
||||||
# operates in "step" mode where condition is supported.
|
task = {
|
||||||
only_child = group.groups[0]
|
'name': sub_group.name,
|
||||||
step = {
|
'template': sub_group.name,
|
||||||
'name': only_child.name,
|
|
||||||
'template': only_child.name,
|
|
||||||
}
|
}
|
||||||
if inputs.get(only_child.name, None):
|
|
||||||
|
if isinstance(sub_group, dsl.OpsGroup) and sub_group.type == 'condition':
|
||||||
|
subgroup_inputs = inputs.get(sub_group.name, [])
|
||||||
|
condition = sub_group.condition
|
||||||
|
condition_operation = '=='
|
||||||
|
operand1_value = self._resolve_value_or_reference(condition.operand1, subgroup_inputs)
|
||||||
|
operand2_value = self._resolve_value_or_reference(condition.operand2, subgroup_inputs)
|
||||||
|
task['when'] = '{} {} {}'.format(operand1_value, condition_operation, operand2_value)
|
||||||
|
|
||||||
|
# Generate dependencies section for this task.
|
||||||
|
if dependencies.get(sub_group.name, None):
|
||||||
|
group_dependencies = list(dependencies[sub_group.name])
|
||||||
|
group_dependencies.sort()
|
||||||
|
task['dependencies'] = group_dependencies
|
||||||
|
|
||||||
|
# Generate arguments section for this task.
|
||||||
|
if inputs.get(sub_group.name, None):
|
||||||
arguments = []
|
arguments = []
|
||||||
for param_name, dependent_name in inputs[only_child.name]:
|
for param_name, dependent_name in inputs[sub_group.name]:
|
||||||
arguments.append({
|
if dependent_name:
|
||||||
|
# The value comes from an upstream sibling.
|
||||||
|
arguments.append({
|
||||||
|
'name': param_name,
|
||||||
|
'value': '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name)
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
# The value comes from its parent.
|
||||||
|
arguments.append({
|
||||||
'name': param_name,
|
'name': param_name,
|
||||||
'value': '{{inputs.parameters.%s}}' % param_name
|
'value': '{{inputs.parameters.%s}}' % param_name
|
||||||
})
|
})
|
||||||
arguments.sort(key=lambda x: x['name'])
|
arguments.sort(key=lambda x: x['name'])
|
||||||
step['arguments'] = {'parameters': arguments}
|
task['arguments'] = {'parameters': arguments}
|
||||||
step['when'] = self._create_condition(group.condition)
|
tasks.append(task)
|
||||||
template['steps'] = [[step]]
|
tasks.sort(key=lambda x: x['name'])
|
||||||
else:
|
template['dag'] = {'tasks': tasks}
|
||||||
# Generate tasks section.
|
|
||||||
tasks = []
|
|
||||||
for sub_group in group.groups + group.ops:
|
|
||||||
task = {
|
|
||||||
'name': sub_group.name,
|
|
||||||
'template': sub_group.name,
|
|
||||||
}
|
|
||||||
# Generate dependencies section for this task.
|
|
||||||
if dependencies.get(sub_group.name, None):
|
|
||||||
group_dependencies = list(dependencies[sub_group.name])
|
|
||||||
group_dependencies.sort()
|
|
||||||
task['dependencies'] = group_dependencies
|
|
||||||
|
|
||||||
# Generate arguments section for this task.
|
|
||||||
if inputs.get(sub_group.name, None):
|
|
||||||
arguments = []
|
|
||||||
for param_name, dependent_name in inputs[sub_group.name]:
|
|
||||||
if dependent_name:
|
|
||||||
# The value comes from an upstream sibling.
|
|
||||||
arguments.append({
|
|
||||||
'name': param_name,
|
|
||||||
'value': '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name)
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
# The value comes from its parent.
|
|
||||||
arguments.append({
|
|
||||||
'name': param_name,
|
|
||||||
'value': '{{inputs.parameters.%s}}' % param_name
|
|
||||||
})
|
|
||||||
arguments.sort(key=lambda x: x['name'])
|
|
||||||
task['arguments'] = {'parameters': arguments}
|
|
||||||
tasks.append(task)
|
|
||||||
tasks.sort(key=lambda x: x['name'])
|
|
||||||
template['dag'] = {'tasks': tasks}
|
|
||||||
return template
|
return template
|
||||||
|
|
||||||
def _create_new_groups(self, root_group):
|
|
||||||
"""Create a copy of the input group, and insert extra groups for conditions."""
|
|
||||||
|
|
||||||
new_group = copy.deepcopy(root_group)
|
|
||||||
|
|
||||||
def _insert_group_for_condition_helper(group):
|
|
||||||
for i, g in enumerate(group.groups):
|
|
||||||
if g.type == 'condition':
|
|
||||||
child_condition_group = dsl.OpsGroup('condition-child', g.name + '-child')
|
|
||||||
child_condition_group.ops = g.ops
|
|
||||||
child_condition_group.groups = g.groups
|
|
||||||
g.groups = [child_condition_group]
|
|
||||||
g.ops = list()
|
|
||||||
_insert_group_for_condition_helper(child_condition_group)
|
|
||||||
else:
|
|
||||||
_insert_group_for_condition_helper(g)
|
|
||||||
|
|
||||||
_insert_group_for_condition_helper(new_group)
|
|
||||||
return new_group
|
|
||||||
|
|
||||||
def _create_templates(self, pipeline):
|
def _create_templates(self, pipeline):
|
||||||
"""Create all groups and ops templates in the pipeline."""
|
"""Create all groups and ops templates in the pipeline."""
|
||||||
|
|
||||||
# This is needed only because Argo does not support condition in DAG mode.
|
new_root_group = pipeline.groups[0]
|
||||||
# Revisit when https://github.com/argoproj/argo/issues/921 is fixed.
|
|
||||||
new_root_group = self._create_new_groups(pipeline.groups[0])
|
|
||||||
|
|
||||||
op_groups = self._get_groups_for_ops(new_root_group)
|
op_groups = self._get_groups_for_ops(new_root_group)
|
||||||
inputs, outputs = self._get_inputs_outputs(pipeline, new_root_group, op_groups)
|
inputs, outputs = self._get_inputs_outputs(pipeline, new_root_group, op_groups)
|
||||||
|
|
|
||||||
|
|
@ -21,18 +21,6 @@ spec:
|
||||||
entrypoint: pipeline-flip-coin
|
entrypoint: pipeline-flip-coin
|
||||||
serviceAccountName: pipeline-runner
|
serviceAccountName: pipeline-runner
|
||||||
templates:
|
templates:
|
||||||
- inputs:
|
|
||||||
parameters:
|
|
||||||
- name: flip-output
|
|
||||||
name: condition-1
|
|
||||||
steps:
|
|
||||||
- - arguments:
|
|
||||||
parameters:
|
|
||||||
- name: flip-output
|
|
||||||
value: '{{inputs.parameters.flip-output}}'
|
|
||||||
name: condition-1-child
|
|
||||||
template: condition-1-child
|
|
||||||
when: '{{inputs.parameters.flip-output}} == heads'
|
|
||||||
- dag:
|
- dag:
|
||||||
tasks:
|
tasks:
|
||||||
- arguments:
|
- arguments:
|
||||||
|
|
@ -45,6 +33,7 @@ spec:
|
||||||
- flip-again
|
- flip-again
|
||||||
name: condition-2
|
name: condition-2
|
||||||
template: condition-2
|
template: condition-2
|
||||||
|
when: '{{tasks.flip-again.outputs.parameters.flip-again-output}} == tails'
|
||||||
- arguments:
|
- arguments:
|
||||||
parameters:
|
parameters:
|
||||||
- name: flip-output
|
- name: flip-output
|
||||||
|
|
@ -54,22 +43,7 @@ spec:
|
||||||
inputs:
|
inputs:
|
||||||
parameters:
|
parameters:
|
||||||
- name: flip-output
|
- name: flip-output
|
||||||
name: condition-1-child
|
name: condition-1
|
||||||
- inputs:
|
|
||||||
parameters:
|
|
||||||
- name: flip-again-output
|
|
||||||
- name: flip-output
|
|
||||||
name: condition-2
|
|
||||||
steps:
|
|
||||||
- - arguments:
|
|
||||||
parameters:
|
|
||||||
- name: flip-again-output
|
|
||||||
value: '{{inputs.parameters.flip-again-output}}'
|
|
||||||
- name: flip-output
|
|
||||||
value: '{{inputs.parameters.flip-output}}'
|
|
||||||
name: condition-2-child
|
|
||||||
template: condition-2-child
|
|
||||||
when: '{{inputs.parameters.flip-again-output}} == tails'
|
|
||||||
- dag:
|
- dag:
|
||||||
tasks:
|
tasks:
|
||||||
- arguments:
|
- arguments:
|
||||||
|
|
@ -84,19 +58,7 @@ spec:
|
||||||
parameters:
|
parameters:
|
||||||
- name: flip-again-output
|
- name: flip-again-output
|
||||||
- name: flip-output
|
- name: flip-output
|
||||||
name: condition-2-child
|
name: condition-2
|
||||||
- inputs:
|
|
||||||
parameters:
|
|
||||||
- name: flip-output
|
|
||||||
name: condition-3
|
|
||||||
steps:
|
|
||||||
- - arguments:
|
|
||||||
parameters:
|
|
||||||
- name: flip-output
|
|
||||||
value: '{{inputs.parameters.flip-output}}'
|
|
||||||
name: condition-3-child
|
|
||||||
template: condition-3-child
|
|
||||||
when: '{{inputs.parameters.flip-output}} == tails'
|
|
||||||
- dag:
|
- dag:
|
||||||
tasks:
|
tasks:
|
||||||
- arguments:
|
- arguments:
|
||||||
|
|
@ -108,7 +70,7 @@ spec:
|
||||||
inputs:
|
inputs:
|
||||||
parameters:
|
parameters:
|
||||||
- name: flip-output
|
- name: flip-output
|
||||||
name: condition-3-child
|
name: condition-3
|
||||||
- container:
|
- container:
|
||||||
args:
|
args:
|
||||||
- python -c "import random; result = 'heads' if random.randint(0,1) == 0 else
|
- python -c "import random; result = 'heads' if random.randint(0,1) == 0 else
|
||||||
|
|
@ -201,6 +163,7 @@ spec:
|
||||||
- flip
|
- flip
|
||||||
name: condition-1
|
name: condition-1
|
||||||
template: condition-1
|
template: condition-1
|
||||||
|
when: '{{tasks.flip.outputs.parameters.flip-output}} == heads'
|
||||||
- arguments:
|
- arguments:
|
||||||
parameters:
|
parameters:
|
||||||
- name: flip-output
|
- name: flip-output
|
||||||
|
|
@ -209,6 +172,7 @@ spec:
|
||||||
- flip
|
- flip
|
||||||
name: condition-3
|
name: condition-3
|
||||||
template: condition-3
|
template: condition-3
|
||||||
|
when: '{{tasks.flip.outputs.parameters.flip-output}} == tails'
|
||||||
- name: flip
|
- name: flip
|
||||||
template: flip
|
template: flip
|
||||||
name: pipeline-flip-coin
|
name: pipeline-flip-coin
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue