SDK - Decoupling ContainerOp from compiler (#1168)

* SDK - Decoupling ContainerOp from compiler
Currently, some code in DSL module depends on some classes that belong to the DSL-compiler.
Ideally, the dependency should go the the other way - the DSL-compiler should depend on DSL, but not the other way around.

This commit fixes that issue for the ContainerOp class.

* Switched from a list of handlers to a single handler
This commit is contained in:
Alexey Volkov 2019-04-23 13:42:01 -07:00 committed by Kubernetes Prow Robot
parent fe042540e8
commit c777401bf1
2 changed files with 21 additions and 6 deletions

View File

@ -20,7 +20,6 @@ from kubernetes.client.models import (
V1ResourceRequirements, V1VolumeDevice, V1VolumeMount, V1ContainerPort,
V1Lifecycle)
from . import _pipeline
from . import _pipeline_param
from ._metadata import ComponentMeta
@ -623,6 +622,15 @@ class Sidecar(Container):
return _pipeline_param.extract_pipelineparams_from_any(self)
def _make_hash_based_id_for_container_op(container_op):
# Generating a unique ID for ContainerOp. For class instances, the hash is the object's memory address which is unique.
return container_op.human_name + ' ' + hex(2**63 + hash(container_op))[2:]
# Pointer to a function that generates a unique ID for the ContainerOp instance (Possibly by registering the ContainerOp instance in some system).
_register_container_op_handler = _make_hash_based_id_for_container_op
class ContainerOp(object):
"""
Represents an op implemented by a container image.
@ -693,8 +701,6 @@ class ContainerOp(object):
one way for outside world to receive outputs of the container.
is_exit_handler: Whether it is used as an exit handler.
"""
if not _pipeline.Pipeline.get_default_pipeline():
raise ValueError('Default pipeline not defined.')
valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$'
if not re.match(valid_name_regex, name):
@ -708,9 +714,6 @@ class ContainerOp(object):
# human_name must exist to construct containerOps name
self.human_name = name
# actual name for argo workflow
self.name = _pipeline.Pipeline.get_default_pipeline().add_op(
self, is_exit_handler)
# `container` prop in `io.argoproj.workflow.v1alpha1.Template`
container_kwargs = container_kwargs or {}
@ -761,6 +764,11 @@ class ContainerOp(object):
self.is_exit_handler = is_exit_handler
self._metadata = None
# ID of the current ContainerOp. Ideally, it should be generated by the compiler that sees the bigger context.
# However, the ID is used in the task output references (PipelineParams) which can be serialized to strings.
# Because of this we must obtain a unique ID right now.
self.name = _register_container_op_handler(self)
self.outputs = {}
if file_outputs:
self.outputs = {

View File

@ -120,10 +120,17 @@ class Pipeline():
raise Exception('Nested pipelines are not allowed.')
Pipeline._default_pipeline = self
def register_op_and_generate_id(op):
return self.add_op(op, op.is_exit_handler)
self._old__register_container_op_handler = _container_op._register_container_op_handler
_container_op._register_container_op_handler = register_op_and_generate_id
return self
def __exit__(self, *args):
Pipeline._default_pipeline = None
_container_op._register_container_op_handler = self._old__register_container_op_handler
def add_op(self, op: _container_op.ContainerOp, define_only: bool):
"""Add a new operator.