add configuration for tekton pipeline spec (#487)

* add configuration for tekton pipeline spec

* sort annotation order
This commit is contained in:
Tommy Li 2021-03-05 17:20:49 -08:00 committed by GitHub
parent d73cafbab5
commit 6b1e02a46b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 165 additions and 4 deletions

View File

@ -40,6 +40,7 @@ from kfp_tekton.compiler._k8s_helper import convert_k8s_obj_to_json, sanitize_k8
from kfp_tekton.compiler._op_to_template import _op_to_template
from kfp_tekton.compiler.yaml_utils import dump_yaml
from kfp_tekton.compiler.any_sequencer import generate_any_sequencer
from kfp_tekton.compiler.pipeline_utils import TektonPipelineConf
from kfp_tekton.compiler._tekton_hander import _handle_tekton_pipeline_variables, _handle_tekton_custom_task
DEFAULT_ARTIFACT_BUCKET = env.get('DEFAULT_ARTIFACT_BUCKET', 'mlpipeline')
@ -108,8 +109,14 @@ class TektonCompiler(Compiler):
self.output_artifacts = {}
self.artifact_items = {}
self.loops_pipeline = {}
self.pipeline_labels = {}
self.pipeline_annotations = {}
super().__init__(**kwargs)
def _set_pipeline_conf(self, tekton_pipeline_conf: TektonPipelineConf):
self.pipeline_labels = tekton_pipeline_conf.pipeline_labels
self.pipeline_annotations = tekton_pipeline_conf.pipeline_annotations
def _resolve_value_or_reference(self, value_or_reference, potential_references):
"""_resolve_value_or_reference resolves values and PipelineParams, which could be task parameters or input parameters.
Args:
@ -532,7 +539,7 @@ class TektonCompiler(Compiler):
'kind': 'PipelineRun',
'metadata': {
'name': sanitize_k8s_name(pipeline.name or 'Pipeline', suffix_space=4),
# 'labels': get_default_telemetry_labels(),
# Reflect the list of Tekton pipeline annotations at the top
'annotations': {
'tekton.dev/output_artifacts': json.dumps(self.output_artifacts, sort_keys=True),
'tekton.dev/input_artifacts': json.dumps(self.input_artifacts, sort_keys=True),
@ -559,6 +566,14 @@ class TektonCompiler(Compiler):
if any_sequencer_annotations:
pipeline_run['metadata']['annotations']['anyConditions'] = json.dumps(any_sequencer_annotations)
if self.pipeline_labels:
pipeline_run['metadata']['labels'] = pipeline_run['metadata'].setdefault('labels', {})
pipeline_run['metadata']['labels'].update(self.pipeline_labels)
if self.pipeline_annotations:
pipeline_run['metadata']['annotations'] = pipeline_run['metadata'].setdefault('annotations', {})
pipeline_run['metadata']['annotations'].update(self.pipeline_annotations)
# Generate TaskRunSpec PodTemplate:s
task_run_spec = []
for task in task_refs:
@ -746,7 +761,8 @@ class TektonCompiler(Compiler):
pipeline_func,
package_path,
type_check=True,
pipeline_conf: dsl.PipelineConf = None):
pipeline_conf: dsl.PipelineConf = None,
tekton_pipeline_conf: TektonPipelineConf = None):
"""Compile the given pipeline function into workflow yaml.
Args:
pipeline_func: pipeline functions with @dsl.pipeline decorator.
@ -756,6 +772,8 @@ class TektonCompiler(Compiler):
image pull secrets and other pipeline-level configuration options.
Overrides any configuration that may be set by the pipeline.
"""
if tekton_pipeline_conf:
self._set_pipeline_conf(tekton_pipeline_conf)
super().compile(pipeline_func, package_path, type_check, pipeline_conf=pipeline_conf)
@staticmethod

View File

@ -0,0 +1,39 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kfp import dsl
TEKTON_PIPELINE_ANNOTATIONS = ['anyConditions', 'sidecar.istio.io/inject', 'tekton.dev/artifact_bucket',
'tekton.dev/artifact_endpoint', 'tekton.dev/artifact_endpoint_scheme',
'tekton.dev/artifact_items', 'tekton.dev/input_artifacts', 'tekton.dev/output_artifacts']
class TektonPipelineConf(dsl.PipelineConf):
"""PipelineConf contains pipeline level settings."""
def __init__(self, **kwargs):
self.pipeline_labels = {}
self.pipeline_annotations = {}
super().__init__(**kwargs)
def add_pipeline_label(self, label_name: str, value: str):
self.pipeline_labels[label_name] = value
return self
def add_pipeline_annotation(self, annotation_name: str, value: str):
if annotation_name in TEKTON_PIPELINE_ANNOTATIONS:
raise ValueError('Cannot add pipeline annotation %s:%s because it is a reserved Tekton annotation.'
% annotation_name, value)
self.pipeline_annotations[annotation_name] = value
return self

View File

@ -362,6 +362,17 @@ class TestTektonCompiler(unittest.TestCase):
from .testdata.exit_handler import download_and_print
self._test_pipeline_workflow(download_and_print, 'exit_handler.yaml')
def test_tekton_pipeline_conf(self):
"""
Test applying Tekton pipeline config to a workflow
"""
from .testdata.tekton_pipeline_conf import echo_pipeline
pipeline_conf = compiler.pipeline_utils.TektonPipelineConf()
pipeline_conf.add_pipeline_label('test', 'label')
pipeline_conf.add_pipeline_label('test2', 'label2')
pipeline_conf.add_pipeline_annotation('test', 'annotation')
self._test_pipeline_workflow(echo_pipeline, 'tekton_pipeline_conf.yaml', tekton_pipeline_conf=pipeline_conf)
def test_compose(self):
"""
Test compiling a simple workflow, and a bigger one composed from a simple one.
@ -391,14 +402,16 @@ class TestTektonCompiler(unittest.TestCase):
def _test_pipeline_workflow(self,
pipeline_function,
pipeline_yaml,
normalize_compiler_output_function=None):
normalize_compiler_output_function=None,
tekton_pipeline_conf=None):
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml)
temp_dir = tempfile.mkdtemp()
compiled_yaml_file = os.path.join(temp_dir, 'workflow.yaml')
try:
compiler.TektonCompiler().compile(pipeline_function,
compiled_yaml_file)
compiled_yaml_file,
tekton_pipeline_conf=tekton_pipeline_conf)
with open(compiled_yaml_file, 'r') as f:
f = normalize_compiler_output_function(
f.read()) if normalize_compiler_output_function else f

View File

@ -0,0 +1,44 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kfp import dsl
import kfp_tekton
def echo_op():
return dsl.ContainerOp(
name='echo',
image='busybox',
command=['sh', '-c'],
arguments=['echo "Got scheduled"']
)
@dsl.pipeline(
name='echo',
description='echo pipeline'
)
def echo_pipeline():
echo_op()
pipeline_conf = kfp_tekton.compiler.pipeline_utils.TektonPipelineConf()
pipeline_conf.add_pipeline_label('test', 'label')
pipeline_conf.add_pipeline_label('test2', 'label2')
pipeline_conf.add_pipeline_annotation('test', 'annotation')
if __name__ == "__main__":
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(echo_pipeline, 'echo_pipeline.yaml', tekton_pipeline_conf=pipeline_conf)

View File

@ -0,0 +1,47 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "echo pipeline", "name":
"echo"}'
sidecar.istio.io/inject: 'false'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"echo": []}'
tekton.dev/input_artifacts: '{}'
tekton.dev/output_artifacts: '{}'
test: annotation
labels:
test: label
test2: label2
name: echo
spec:
pipelineSpec:
tasks:
- name: echo
taskSpec:
steps:
- args:
- echo "Got scheduled"
command:
- sh
- -c
image: busybox
name: main
timeout: 0s
timeout: 0s