Compare commits

...

17 Commits

Author SHA1 Message Date
Tommy Li 0b89419544
chore(owners): Promote rafalbigaj as the approver (#1484)
Signed-off-by: tomcli <tommy.chaoping.li@ibm.com>
2024-04-17 22:13:39 +00:00
Tommy Li 3e7950ffd3
chore(deps): sync kfp deps with the latest commit (#1485)
Signed-off-by: tomcli <tommy.chaoping.li@ibm.com>
2024-04-16 23:32:26 +00:00
Tommy Li bb47bcd892
chore(ci): Add clean up step for tekton ci (#1480) 2024-03-27 17:36:21 +00:00
Tommy Li c0d25310d5
chore(kfp-task): Update driver package to 2.1.0 release (#1478) 2024-03-27 00:11:19 +00:00
Tommy Li b49f959db9
chore(tekton-driver): Update tekton v2 driver to support the latest k8s spec from upstream (#1464)
Signed-off-by: Tommy Li <Tommy.chaoping.li@ibm.com>
2024-03-06 03:38:07 +00:00
Helber Belmiro 16e781dce9
fix(docs): Updated legal info due to migration from CLA to DCO (#1463)
* Updated legal info due to migration from CLA to DCO

Signed-off-by: hbelmiro <helber.belmiro@gmail.com>

* Fixed TOC

Signed-off-by: hbelmiro <helber.belmiro@gmail.com>

---------

Signed-off-by: hbelmiro <helber.belmiro@gmail.com>
2024-03-05 17:18:07 +00:00
Tommy Li 803377e899
chore(sdk): Add sdk 1.9.3 release (#1462)
Signed-off-by: Tommy Li <Tommy.chaoping.li@ibm.com>
2024-02-27 23:12:59 +00:00
Tommy Li db6d85ece6
feat(sdk): add verify_ssl flag to support self cert (#1461)
Signed-off-by: Tommy Li <Tommy.chaoping.li@ibm.com>
2024-02-27 22:52:59 +00:00
Tommy Li 9f568f2a72
feat(ci): Update github actions to also test python 3.12 (#1456)
* Update github actions to also test python 3.12

* Update setup.py

* Update setup.py

* Update kfp-tekton-unittests.yml

* Update kfp-tekton-unittests.yml

* Update kfp-tekton-unittests.yml

* Update kfp-tekton-unittests.yml

* Update README.md
2024-02-13 18:35:05 +00:00
Tommy Li a9d7df96d2
fix(README): Update instructions to only use kustomize (#1455) 2024-02-13 00:07:04 +00:00
Tommy Li b77e6f38d5
Chore(docs) Update kfp_tekton_install V2 instructions for KFP-Tekton 2.0.5 release (#1453) 2024-01-18 17:35:59 +00:00
Tommy Li bb06e5e721
chore(release): add kfp-tekton backend 1.9.2 release (#1451) 2024-01-18 00:21:58 +00:00
Tommy Li 550a827b05
feat(tekton-kfptask): Update kfptask to support pod metadata (#1449)
* update kfptask to support pod metadata

* fix type
2024-01-17 08:53:54 +00:00
Tommy Li d5fc9fd5c9
chore(README): Remove deprecated MLX wording (#1447) 2024-01-10 22:50:11 +00:00
Tommy Li a71ba164ad
feat(pipelineloop): Update pipelineloop v2 to have failed and cancelled status (#1445)
* update pipelineloop v2 to have failed and cancelled status

* fix execution id update bug
2024-01-05 19:25:09 +00:00
Tommy Li ff8bb50dc4
chore(tests): add unit tests for tekton template v1 (#1444)
* add unit tests for tekton template v1

* update license
2024-01-03 23:34:11 +00:00
Tommy Li 08e438099a
feat(pipelineloop): Optimize pipelineloop get performance with Lister (#1443) 2024-01-02 20:32:14 +00:00
36 changed files with 3903 additions and 4566 deletions

View File

@ -9,14 +9,13 @@ on:
env: env:
GITHUB_ACTION: "true" GITHUB_ACTION: "true"
SETUPTOOLS_USE_DISTUTILS: "stdlib"
jobs: jobs:
python-unittest: python-unittest:
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy: strategy:
matrix: matrix:
python-version: ['3.8', '3.9', '3.10', '3.11'] python-version: ['3.8', '3.9', '3.10', '3.11', '3.12']
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }} - name: Set up Python ${{ matrix.python-version }}

View File

@ -66,6 +66,7 @@ spec:
command: ["/bin/bash", "-c"] command: ["/bin/bash", "-c"]
args: args:
- set -ex; - set -ex;
rm -r /artifacts/*;
cd /artifacts && git clone -q -b $GIT_BRANCH $GIT_URL .; cd /artifacts && git clone -q -b $GIT_BRANCH $GIT_URL .;
GIT_COMMIT=$(git rev-parse HEAD); GIT_COMMIT=$(git rev-parse HEAD);
source ./scripts/deploy/iks/run-test.sh; source ./scripts/deploy/iks/run-test.sh;

View File

@ -9,7 +9,7 @@ just a few small guidelines you need to follow.
<!-- START of ToC generated by running ./tools/mdtoc.sh CONTRIBUTING.md --> <!-- START of ToC generated by running ./tools/mdtoc.sh CONTRIBUTING.md -->
- [Project Structure](#project-structure) - [Project Structure](#project-structure)
- [Contributor License Agreement](#contributor-license-agreement) - [Legal](#legal)
- [Coding Style](#coding-style) - [Coding Style](#coding-style)
- [Unit Testing Best Practices](#unit-testing-best-practices) - [Unit Testing Best Practices](#unit-testing-best-practices)
- [Golang](#golang) - [Golang](#golang)
@ -35,17 +35,11 @@ To get started, see the development guides:
* [Backend development guide](./backend/README.md) * [Backend development guide](./backend/README.md)
* [SDK development guide](./sdk/python/README.md) * [SDK development guide](./sdk/python/README.md)
## Contributor License Agreement ## Legal
Contributions to this project must be accompanied by a Contributor License Kubeflow uses Developer Certificate of Origin ([DCO](https://github.com/apps/dco/)).
Agreement. You (or your employer) retain the copyright to your contribution;
this simply gives us permission to use and redistribute your contributions as
part of the project. Head over to <https://cla.developers.google.com/> to see
your current agreements on file or to sign a new one.
You generally only need to submit a CLA once, so if you've already submitted one Please see https://github.com/kubeflow/community/tree/master/dco-signoff-hook#signing-off-commits to learn how to sign off your commits.
(even if it was for a different project), you probably don't need to do it
again.
## Coding Style ## Coding Style

View File

@ -16,7 +16,7 @@
# - The help target was derived from https://stackoverflow.com/a/35730328/5601796 # - The help target was derived from https://stackoverflow.com/a/35730328/5601796
VENV ?= .venv VENV ?= .venv
KFP_TEKTON_RELEASE ?= v1.9.1 KFP_TEKTON_RELEASE ?= v1.9.2
export VIRTUAL_ENV := $(abspath ${VENV}) export VIRTUAL_ENV := $(abspath ${VENV})
export PATH := ${VIRTUAL_ENV}/bin:${PATH} export PATH := ${VIRTUAL_ENV}/bin:${PATH}
DOCKER_REGISTRY ?= aipipeline DOCKER_REGISTRY ?= aipipeline

1
OWNERS
View File

@ -6,6 +6,7 @@ approvers:
- pugangxa - pugangxa
- scrapcodes - scrapcodes
- yhwang - yhwang
- rafalbigaj
reviewers: reviewers:
- ckadner - ckadner
- Tomcli - Tomcli

View File

@ -11,8 +11,6 @@ according to this [design doc](http://bit.ly/kfp-tekton). The current code allow
For more details about the project please follow this detailed [blog post](https://developer.ibm.com/blogs/awb-tekton-optimizations-for-kubeflow-pipelines-2-0) . For the latest KFP-Tekton V2 implementation and [supported offerings](https://developer.ibm.com/articles/advance-machine-learning-workflows-with-ibm-watson-pipelines/), please follow our latest [Kubecon Talk](https://www.youtube.com/watch?v=ecx-yp4g7YU) and [slides](https://docs.google.com/presentation/d/1Su42ApXzZvVwhNSYRAk3bd0heHOtrdEX/edit?usp=sharing&ouid=103716780892927252554&rtpof=true&sd=true). For information on the KFP-Tekton V1 implementation, look at these [slides](https://www.slideshare.net/AnimeshSingh/kubeflow-pipelines-with-tekton-236769976) as well as this [deep dive presentation](https://www.youtube.com/watch?v=AYIeNtXLT_k) for demos. For more details about the project please follow this detailed [blog post](https://developer.ibm.com/blogs/awb-tekton-optimizations-for-kubeflow-pipelines-2-0) . For the latest KFP-Tekton V2 implementation and [supported offerings](https://developer.ibm.com/articles/advance-machine-learning-workflows-with-ibm-watson-pipelines/), please follow our latest [Kubecon Talk](https://www.youtube.com/watch?v=ecx-yp4g7YU) and [slides](https://docs.google.com/presentation/d/1Su42ApXzZvVwhNSYRAk3bd0heHOtrdEX/edit?usp=sharing&ouid=103716780892927252554&rtpof=true&sd=true). For information on the KFP-Tekton V1 implementation, look at these [slides](https://www.slideshare.net/AnimeshSingh/kubeflow-pipelines-with-tekton-236769976) as well as this [deep dive presentation](https://www.youtube.com/watch?v=AYIeNtXLT_k) for demos.
**Note**: If you are interested in a sister project built on top of Kubeflow Pipelines with Tekton, please try [Machine Learning eXchange (MLX)](https://github.com/machine-learning-exchange), Data and AI Assets Catalog and Execution Engine. It introduces a 'Component Registry' for Kubeflow Pipelines, amongst other things.
## Architecture ## Architecture
We are currently using [Kubeflow Pipelines 1.8.4](https://github.com/kubeflow/pipelines/releases/tag/1.8.4) and We are currently using [Kubeflow Pipelines 1.8.4](https://github.com/kubeflow/pipelines/releases/tag/1.8.4) and

View File

@ -1 +1 @@
1.9.1 1.9.2

View File

@ -1,3 +1,17 @@
// Copyright 2024 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package template package template
import ( import (

View File

@ -0,0 +1,131 @@
// Copyright 2024 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package template
import (
"testing"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
)
func TestGetConfigMapVolumeSource(t *testing.T) {
t.Run("Returns correct volume source", func(t *testing.T) {
// Create a new instance of Tekton
tekton := &Tekton{}
// Define the expected volume source
expectedVolumeSource := corev1.Volume{
Name: "testVolume",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: "testConfigMap",
},
},
},
}
// Call the getConfigMapVolumeSource method
volumeSource := tekton.getConfigMapVolumeSource("testVolume", "testConfigMap")
// Assert that the returned volume source matches the expected volume source
assert.Equal(t, expectedVolumeSource, volumeSource)
})
}
func TestGetHostPathVolumeSource(t *testing.T) {
t.Run("Returns correct volume source", func(t *testing.T) {
// Create a new instance of Tekton
tekton := &Tekton{}
// Define the expected volume source
expectedVolumeSource := corev1.Volume{
Name: "testVolume",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "testPath",
},
},
}
// Call the getHostPathVolumeSource method
volumeSource := tekton.getHostPathVolumeSource("testVolume", "testPath")
// Assert that the returned volume source matches the expected volume source
assert.Equal(t, expectedVolumeSource, volumeSource)
})
}
func TestGetEnvVar(t *testing.T) {
t.Run("Returns correct environment variable", func(t *testing.T) {
// Create a new instance of Tekton
tekton := &Tekton{}
// Define the input parameters
name := "testName"
value := "testValue"
// Call the getEnvVar method
envVar := tekton.getEnvVar(name, value)
// Assert that the returned environment variable has the correct name and value
assert.Equal(t, name, envVar.Name)
assert.Equal(t, value, envVar.Value)
})
}
func TestGetSecretKeySelector(t *testing.T) {
t.Run("Returns correct environment variable", func(t *testing.T) {
// Create a new instance of Tekton
tekton := &Tekton{}
// Define the input parameters
name := "testName"
objectName := "testObjectName"
objectKey := "testObjectKey"
// Call the getSecretKeySelector method
envVar := tekton.getSecretKeySelector(name, objectName, objectKey)
// Assert that the returned environment variable has the correct name
assert.Equal(t, name, envVar.Name)
// Assert that the returned environment variable has the correct SecretKeyRef
assert.NotNil(t, envVar.ValueFrom.SecretKeyRef)
assert.Equal(t, objectName, envVar.ValueFrom.SecretKeyRef.LocalObjectReference.Name)
assert.Equal(t, objectKey, envVar.ValueFrom.SecretKeyRef.Key)
})
}
func TestGetObjectFieldSelector(t *testing.T) {
t.Run("Returns correct environment variable", func(t *testing.T) {
// Create a new instance of Tekton
tekton := &Tekton{}
// Define the input parameters
name := "testName"
fieldPath := "testFieldPath"
// Call the getObjectFieldSelector method
envVar := tekton.getObjectFieldSelector(name, fieldPath)
// Assert that the returned environment variable has the correct name
assert.Equal(t, name, envVar.Name)
// Assert that the returned environment variable has the correct ObjectFieldSelector
assert.NotNil(t, envVar.ValueFrom.FieldRef)
assert.Equal(t, fieldPath, envVar.ValueFrom.FieldRef.FieldPath)
})
}

View File

@ -52,8 +52,8 @@ Each new KFP-Tekton version is based on the long-term support of the Tekton Pipe
| 1.7.x | 0.47.x | 1.11 | V1beta1 | 1.16.0 | | 1.7.x | 0.47.x | 1.11 | V1beta1 | 1.16.0 |
| 1.8.x | 0.50.x | 1.12 | V1 | 2.11.3 | | 1.8.x | 0.50.x | 1.12 | V1 | 2.11.3 |
| 1.9.x | 0.53.x | 1.13 | V1 | 2.11.3 | | 1.9.x | 0.53.x | 1.13 | V1 | 2.11.3 |
| 2.0.x | 0.47.x | 1.11 | V1beta1 | 1.16.0 | | 2.0.3 | 0.47.x | 1.11 | V1beta1 | 1.16.0 |
| 2.1.x | 0.53.x | 1.13 | V1 | 1.16.0 | | 2.0.5 | 0.53.x | 1.13 | V1 | 1.16.0 |
## Standalone Kubeflow Pipelines V1 with Tekton Backend Deployment ## Standalone Kubeflow Pipelines V1 with Tekton Backend Deployment
@ -73,17 +73,12 @@ To install the standalone Kubeflow Pipelines V1 with Tekton , run the following
-p '{"data":{"default-timeout-minutes": "0"}}' -p '{"data":{"default-timeout-minutes": "0"}}'
``` ```
3. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v1.9.1` [custom resource definitions](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)(CRDs). 3. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v1.9.2` deployment
```shell ```shell
kubectl apply --selector kubeflow/crd-install=true -f https://raw.githubusercontent.com/kubeflow/kfp-tekton/master/install/v1.9.1/kfp-tekton.yaml kubectl apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/env/kfp-template\?ref\=v1.9.2
``` ```
4. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v1.9.1` deployment 4. Then, if you want to expose the Kubeflow Pipelines endpoint outside the cluster, run the following commands:
```shell
kubectl apply -f https://raw.githubusercontent.com/kubeflow/kfp-tekton/master/install/v1.9.1/kfp-tekton.yaml
```
5. Then, if you want to expose the Kubeflow Pipelines endpoint outside the cluster, run the following commands:
```shell ```shell
kubectl patch svc ml-pipeline-ui -n kubeflow -p '{"spec": {"type": "LoadBalancer"}}' kubectl patch svc ml-pipeline-ui -n kubeflow -p '{"spec": {"type": "LoadBalancer"}}'
``` ```
@ -93,15 +88,15 @@ To install the standalone Kubeflow Pipelines V1 with Tekton , run the following
kubectl get svc ml-pipeline-ui -n kubeflow -o jsonpath='{.status.loadBalancer.ingress[0].ip}' kubectl get svc ml-pipeline-ui -n kubeflow -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
``` ```
6. (GPU worker nodes only) If your Kubernetes cluster has a mixture of CPU and GPU worker nodes, it's recommended to disable the Tekton default affinity assistant so that Tekton won't schedule too many CPU workloads on the GPU nodes. 5. (GPU worker nodes only) If your Kubernetes cluster has a mixture of CPU and GPU worker nodes, it's recommended to disable the Tekton default affinity assistant so that Tekton won't schedule too many CPU workloads on the GPU nodes.
```shell ```shell
kubectl patch cm feature-flags -n tekton-pipelines \ kubectl patch cm feature-flags -n tekton-pipelines \
-p '{"data":{"disable-affinity-assistant": "true"}}' -p '{"data":{"disable-affinity-assistant": "true"}}'
``` ```
7. (OpenShift only) If you are running the standalone KFP-Tekton on OpenShift, apply the necessary security context constraint below 6. (OpenShift only) If you are running the standalone KFP-Tekton on OpenShift, apply the necessary security context constraint below
```shell ```shell
curl -L https://raw.githubusercontent.com/kubeflow/kfp-tekton/master/install/v1.9.1/kfp-tekton.yaml | yq 'del(.spec.template.spec.containers[].securityContext.runAsUser, .spec.template.spec.containers[].securityContext.runAsGroup)' | oc apply -f - curl -L https://raw.githubusercontent.com/kubeflow/kfp-tekton/master/install/v1.9.2/kfp-tekton.yaml | yq 'del(.spec.template.spec.containers[].securityContext.runAsUser, .spec.template.spec.containers[].securityContext.runAsGroup)' | oc apply -f -
oc apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/third-party/openshift/standalone oc apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/third-party/openshift/standalone
oc adm policy add-scc-to-user anyuid -z tekton-pipelines-controller oc adm policy add-scc-to-user anyuid -z tekton-pipelines-controller
oc adm policy add-scc-to-user anyuid -z tekton-pipelines-webhook oc adm policy add-scc-to-user anyuid -z tekton-pipelines-webhook
@ -111,9 +106,9 @@ To install the standalone Kubeflow Pipelines V1 with Tekton , run the following
To install the standalone Kubeflow Pipelines V2 with Tekton, run the following steps: To install the standalone Kubeflow Pipelines V2 with Tekton, run the following steps:
1. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v2.0.3` along with Tekton `v0.47.5` 1. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v2.0.5` along with Tekton `v0.53.2`
```shell ```shell
kubectl apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/env/platform-agnostic-tekton\?ref\=v2.0.3 kubectl apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/env/platform-agnostic-tekton\?ref\=v2.0.5
``` ```
2. Then, if you want to expose the Kubeflow Pipelines endpoint outside the cluster, run the following commands: 2. Then, if you want to expose the Kubeflow Pipelines endpoint outside the cluster, run the following commands:
@ -135,7 +130,7 @@ To install the standalone Kubeflow Pipelines V2 with Tekton, run the following s
Now, please use the [KFP V2 Python SDK](https://pypi.org/project/kfp/) to compile KFP-Tekton V2 pipelines because we are sharing the same pipeline spec starting from KFP V2.0.0. Now, please use the [KFP V2 Python SDK](https://pypi.org/project/kfp/) to compile KFP-Tekton V2 pipelines because we are sharing the same pipeline spec starting from KFP V2.0.0.
```shell ```shell
pip install "kfp>=2.0" "kfp-kubernetes>=1.0.0" pip install "kfp>=2.6.0" "kfp-kubernetes>=1.1.0"
``` ```
## Standalone Kubeflow Pipelines with Openshift Pipelines Backend Deployment ## Standalone Kubeflow Pipelines with Openshift Pipelines Backend Deployment

File diff suppressed because it is too large Load Diff

View File

@ -11,4 +11,4 @@ commonLabels:
images: images:
- name: gcr.io/ml-pipeline/cache-server - name: gcr.io/ml-pipeline/cache-server
newName: quay.io/aipipeline/cache-server newName: quay.io/aipipeline/cache-server
newTag: 1.9.1 newTag: 1.9.2

View File

@ -42,20 +42,20 @@ patches:
images: images:
- name: gcr.io/ml-pipeline/api-server - name: gcr.io/ml-pipeline/api-server
newName: quay.io/aipipeline/api-server newName: quay.io/aipipeline/api-server
newTag: 1.9.1 newTag: 1.9.2
- name: gcr.io/ml-pipeline/persistenceagent - name: gcr.io/ml-pipeline/persistenceagent
newName: quay.io/aipipeline/persistenceagent newName: quay.io/aipipeline/persistenceagent
newTag: 1.9.1 newTag: 1.9.2
- name: gcr.io/ml-pipeline/scheduledworkflow - name: gcr.io/ml-pipeline/scheduledworkflow
newName: quay.io/aipipeline/scheduledworkflow newName: quay.io/aipipeline/scheduledworkflow
newTag: 1.9.1 newTag: 1.9.2
- name: gcr.io/ml-pipeline/frontend - name: gcr.io/ml-pipeline/frontend
newName: quay.io/aipipeline/frontend newName: quay.io/aipipeline/frontend
newTag: 1.9.1 newTag: 1.9.2
- name: gcr.io/ml-pipeline/viewer-crd-controller - name: gcr.io/ml-pipeline/viewer-crd-controller
newTag: 1.8.4 newTag: 1.8.4
- name: gcr.io/ml-pipeline/visualization-server - name: gcr.io/ml-pipeline/visualization-server
newTag: 1.8.4 newTag: 1.8.4
- name: gcr.io/ml-pipeline/metadata-writer - name: gcr.io/ml-pipeline/metadata-writer
newName: quay.io/aipipeline/metadata-writer newName: quay.io/aipipeline/metadata-writer
newTag: 1.9.1 newTag: 1.9.2

View File

@ -8,6 +8,6 @@ namespace: tekton-pipelines
images: images:
- name: quay.io/aipipeline/pipelineloop-controller - name: quay.io/aipipeline/pipelineloop-controller
newTag: 1.9.1 newTag: 1.9.2
- name: quay.io/aipipeline/pipelineloop-webhook - name: quay.io/aipipeline/pipelineloop-webhook
newTag: 1.9.1 newTag: 1.9.2

View File

@ -61,7 +61,7 @@ adding the `TektonCompiler` and the `TektonClient`:
## Project Prerequisites ## Project Prerequisites
- Python: `3.8` or later - Python: `3.8` or later. For Python 3.12, make sure to not have the `SETUPTOOLS_USE_DISTUTILS` flag because it's already [deprecated](https://github.com/pypa/setuptools/issues/4002).
- Tekton: [`v0.53.2`](https://github.com/tektoncd/pipeline/releases/tag/v0.53.2) or [later](https://github.com/tektoncd/pipeline/releases/latest) - Tekton: [`v0.53.2`](https://github.com/tektoncd/pipeline/releases/tag/v0.53.2) or [later](https://github.com/tektoncd/pipeline/releases/latest)
- Tekton CLI: [`0.30.1`](https://github.com/tektoncd/cli/releases/tag/v0.30.1) - Tekton CLI: [`0.30.1`](https://github.com/tektoncd/cli/releases/tag/v0.30.1)
- Kubeflow Pipelines: [KFP with Tekton backend](/guides/kfp_tekton_install.md) - Kubeflow Pipelines: [KFP with Tekton backend](/guides/kfp_tekton_install.md)

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
__version__ = '1.9.2' __version__ = '1.9.3'
from ._client import TektonClient # noqa F401 from ._client import TektonClient # noqa F401
from .k8s_client_helper import env_from_secret # noqa F401 from .k8s_client_helper import env_from_secret # noqa F401

View File

@ -16,6 +16,7 @@
import os import os
import tempfile import tempfile
import time import time
import warnings
import datetime import datetime
from typing import Mapping, Callable, Optional from typing import Mapping, Callable, Optional
@ -27,6 +28,7 @@ from kfp_tekton_server_api import ApiException
from .compiler import TektonCompiler from .compiler import TektonCompiler
from .compiler.pipeline_utils import TektonPipelineConf from .compiler.pipeline_utils import TektonPipelineConf
from kfp._auth import get_auth_token, get_gcp_access_token
import json import json
import logging import logging
@ -106,7 +108,8 @@ class TektonClient(kfp.Client):
ssl_ca_cert=None, ssl_ca_cert=None,
kube_context=None, kube_context=None,
credentials=None, credentials=None,
ui_host=None): ui_host=None,
verify_ssl=None):
"""Create a new instance of kfp client.""" """Create a new instance of kfp client."""
host = host or os.environ.get(KF_PIPELINES_ENDPOINT_ENV) host = host or os.environ.get(KF_PIPELINES_ENDPOINT_ENV)
self._uihost = os.environ.get(KF_PIPELINES_UI_ENDPOINT_ENV, ui_host or self._uihost = os.environ.get(KF_PIPELINES_UI_ENDPOINT_ENV, ui_host or
@ -120,7 +123,7 @@ class TektonClient(kfp.Client):
config = self._load_config(host, client_id, namespace, other_client_id, config = self._load_config(host, client_id, namespace, other_client_id,
other_client_secret, existing_token, proxy, other_client_secret, existing_token, proxy,
ssl_ca_cert, kube_context, credentials) ssl_ca_cert, kube_context, credentials, verify_ssl)
# Save the loaded API client configuration, as a reference if update is # Save the loaded API client configuration, as a reference if update is
# needed. # needed.
self._load_context_setting_or_default() self._load_context_setting_or_default()
@ -162,7 +165,106 @@ class TektonClient(kfp.Client):
except FileNotFoundError: except FileNotFoundError:
logging.info( logging.info(
'Failed to automatically set namespace.', exc_info=False) 'Failed to automatically set namespace.', exc_info=False)
def _load_config(self, host, client_id, namespace, other_client_id,
other_client_secret, existing_token, proxy, ssl_ca_cert,
kube_context, credentials, verify_ssl):
config = kfp_server_api.configuration.Configuration()
if proxy:
# https://github.com/kubeflow/pipelines/blob/c6ac5e0b1fd991e19e96419f0f508ec0a4217c29/backend/api/python_http_client/kfp_server_api/rest.py#L100
config.proxy = proxy
if verify_ssl is not None:
config.verify_ssl = verify_ssl
if ssl_ca_cert:
config.ssl_ca_cert = ssl_ca_cert
host = host or ''
# Defaults to 'https' if host does not contain 'http' or 'https' protocol.
if host and not host.startswith('http'):
warnings.warn(
'The host %s does not contain the "http" or "https" protocol.'
' Defaults to "https".' % host)
host = 'https://' + host
# Preprocess the host endpoint to prevent some common user mistakes.
if not client_id:
# always preserving the protocol (http://localhost requires it)
host = host.rstrip('/')
if host:
config.host = host
token = None
# "existing_token" is designed to accept token generated outside of SDK. Here is an example.
#
# https://cloud.google.com/functions/docs/securing/function-identity
# https://cloud.google.com/endpoints/docs/grpc/service-account-authentication
#
# import requests
# import kfp
#
# def get_access_token():
# url = 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token'
# r = requests.get(url, headers={'Metadata-Flavor': 'Google'})
# r.raise_for_status()
# access_token = r.json()['access_token']
# return access_token
#
# client = kfp.Client(host='<KFPHost>', existing_token=get_access_token())
#
if existing_token:
token = existing_token
self._is_refresh_token = False
elif client_id:
token = get_auth_token(client_id, other_client_id,
other_client_secret)
self._is_refresh_token = True
elif self._is_inverse_proxy_host(host):
token = get_gcp_access_token()
self._is_refresh_token = False
elif credentials:
config.api_key['authorization'] = 'placeholder'
config.api_key_prefix['authorization'] = 'Bearer'
config.refresh_api_key_hook = credentials.refresh_api_key_hook
if token:
config.api_key['authorization'] = token
config.api_key_prefix['authorization'] = 'Bearer'
return config
if host:
# if host is explicitly set with auth token, it's probably a port forward address.
return config
import kubernetes as k8s
in_cluster = True
try:
k8s.config.load_incluster_config()
except:
in_cluster = False
pass
if in_cluster:
config.host = TektonClient.IN_CLUSTER_DNS_NAME.format(namespace)
config = self._get_config_with_default_credentials(config)
return config
try:
k8s.config.load_kube_config(
client_configuration=config, context=kube_context)
except:
print('Failed to load kube config.')
return config
if config.host:
config.host = config.host + '/' + TektonClient.KUBE_PROXY_PATH.format(
namespace)
return config
def wait_for_run_completion(self, run_id: str, timeout: int): def wait_for_run_completion(self, run_id: str, timeout: int):
"""Waits for a run to complete. """Waits for a run to complete.

View File

@ -20,14 +20,14 @@
# #
# To create a distribution for PyPi run: # To create a distribution for PyPi run:
# #
# $ export KFP_TEKTON_VERSION=1.9.2-rc1 # $ export KFP_TEKTON_VERSION=1.9.3-rc1
# $ python3 setup.py sdist # $ python3 setup.py sdist
# $ twine check dist/kfp-tekton-${KFP_TEKTON_VERSION/-rc/rc}.tar.gz # $ twine check dist/kfp-tekton-${KFP_TEKTON_VERSION/-rc/rc}.tar.gz
# $ twine upload --repository pypi dist/kfp-tekton-${KFP_TEKTON_VERSION/-rc/rc}.tar.gz # $ twine upload --repository pypi dist/kfp-tekton-${KFP_TEKTON_VERSION/-rc/rc}.tar.gz
# #
# ... or: # ... or:
# #
# $ make distribution KFP_TEKTON_VERSION=1.9.2-rc1 # $ make distribution KFP_TEKTON_VERSION=1.9.3-rc1
# #
# ============================================================================= # =============================================================================
@ -61,7 +61,8 @@ logger.setLevel(logging.INFO)
REQUIRES = [ REQUIRES = [
"kfp>=1.8.10,<1.8.23", "kfp>=1.8.10,<1.8.23",
"kfp-tekton-server-api==1.8.0rc8", "kfp-tekton-server-api==1.8.0rc8",
"PyYAML>=6,<7" "PyYAML>=6,<7",
"setuptools>=69.1"
] ]
TESTS_REQUIRE = [ TESTS_REQUIRE = [

View File

@ -1 +1 @@
1.8.1 1.9.2

View File

@ -8,12 +8,12 @@ require (
github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-multierror v1.1.1
github.com/kubeflow/kfp-tekton/tekton-catalog/cache v0.0.0 github.com/kubeflow/kfp-tekton/tekton-catalog/cache v0.0.0
github.com/kubeflow/kfp-tekton/tekton-catalog/objectstore v0.0.0 github.com/kubeflow/kfp-tekton/tekton-catalog/objectstore v0.0.0
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-00010101000000-000000000000 github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-20231127195001-a75d4b3711ff
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20231027040853-58ce09e07d03 github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20240416215826-da804407ad31
github.com/tektoncd/pipeline v0.53.2 github.com/tektoncd/pipeline v0.53.2
go.uber.org/zap v1.26.0 go.uber.org/zap v1.26.0
gomodules.xyz/jsonpatch/v2 v2.4.0 gomodules.xyz/jsonpatch/v2 v2.4.0
k8s.io/api v0.27.1 k8s.io/api v0.27.2
k8s.io/apimachinery v0.27.3 k8s.io/apimachinery v0.27.3
k8s.io/client-go v0.27.2 k8s.io/client-go v0.27.2
k8s.io/utils v0.0.0-20230505201702-9f6742963106 k8s.io/utils v0.0.0-20230505201702-9f6742963106
@ -30,7 +30,7 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
github.com/IBM/ibm-cos-sdk-go v1.8.0 // indirect github.com/IBM/ibm-cos-sdk-go v1.8.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
github.com/aws/aws-sdk-go v1.42.50 // indirect github.com/aws/aws-sdk-go v1.45.25 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect github.com/blang/semver/v4 v4.0.0 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect github.com/blendle/zapdriver v1.3.1 // indirect
@ -41,16 +41,16 @@ require (
github.com/emicklei/go-restful/v3 v3.10.2 // indirect github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-kit/log v0.2.0 // indirect github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/gobuffalo/flect v0.2.4 // indirect github.com/gobuffalo/flect v0.2.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.1.0 // indirect github.com/golang/glog v1.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/google/cel-go v0.12.6 // indirect github.com/google/cel-go v0.12.6 // indirect
@ -74,21 +74,21 @@ require (
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03 // indirect github.com/kubeflow/pipelines v0.0.0-20240416215826-da804407ad31 // indirect
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03 // indirect github.com/kubeflow/pipelines/api v0.0.0-20240416215826-da804407ad31 // indirect
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03 // indirect github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240416215826-da804407ad31 // indirect
github.com/mailru/easyjson v0.7.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-sqlite3 v1.14.16 // indirect github.com/mattn/go-sqlite3 v1.14.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect
@ -100,7 +100,7 @@ require (
golang.org/x/crypto v0.17.0 // indirect golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
golang.org/x/mod v0.12.0 // indirect golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.15.0 // indirect golang.org/x/sys v0.15.0 // indirect
@ -122,8 +122,8 @@ require (
gorm.io/driver/mysql v1.4.3 // indirect gorm.io/driver/mysql v1.4.3 // indirect
gorm.io/driver/sqlite v1.4.2 // indirect gorm.io/driver/sqlite v1.4.2 // indirect
gorm.io/gorm v1.24.0 // indirect gorm.io/gorm v1.24.0 // indirect
k8s.io/apiextensions-apiserver v0.26.5 // indirect k8s.io/apiextensions-apiserver v0.27.2 // indirect
k8s.io/code-generator v0.26.5 // indirect k8s.io/code-generator v0.27.2 // indirect
k8s.io/gengo v0.0.0-20221011193443-fad74ee6edd9 // indirect k8s.io/gengo v0.0.0-20221011193443-fad74ee6edd9 // indirect
k8s.io/klog/v2 v2.100.1 // indirect k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect

View File

@ -211,6 +211,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, customRun *tektonv1beta1
} }
logger.Infof("Received control for a CustomRun %s/%s %-v", customRun.Namespace, customRun.Name, customRun.Spec.CustomSpec) logger.Infof("Received control for a CustomRun %s/%s %-v", customRun.Namespace, customRun.Name, customRun.Spec.CustomSpec)
// If the CustomRun has not started, initialize the Condition and set the start time. // If the CustomRun has not started, initialize the Condition and set the start time.
firstIteration := false
if !customRun.HasStarted() { if !customRun.HasStarted() {
logger.Infof("Starting new CustomRun %s/%s", customRun.Namespace, customRun.Name) logger.Infof("Starting new CustomRun %s/%s", customRun.Namespace, customRun.Name)
customRun.Status.InitializeConditions() customRun.Status.InitializeConditions()
@ -226,6 +227,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, customRun *tektonv1beta1
// on the event to perform user facing initialisations, such has reset a CI check status // on the event to perform user facing initialisations, such has reset a CI check status
afterCondition := customRun.Status.GetCondition(apis.ConditionSucceeded) afterCondition := customRun.Status.GetCondition(apis.ConditionSucceeded)
events.Emit(ctx, nil, afterCondition, customRun) events.Emit(ctx, nil, afterCondition, customRun)
firstIteration = true
} }
// Store the condition before reconcile // Store the condition before reconcile
@ -264,7 +266,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, customRun *tektonv1beta1
return nil return nil
} }
// Reconcile the Run // Reconcile the Run
if err := c.reconcile(ctx, customRun, status); err != nil { if err := c.reconcile(ctx, customRun, status, firstIteration); err != nil {
logger.Errorf("Reconcile error: %v", err.Error()) logger.Errorf("Reconcile error: %v", err.Error())
merr = multierror.Append(merr, err) merr = multierror.Append(merr, err)
} }
@ -341,7 +343,7 @@ func (c *Reconciler) setMaxNestedStackDepth(ctx context.Context, pipelineLoopSpe
} }
} else if t.TaskRef != nil { } else if t.TaskRef != nil {
if t.TaskRef.Kind == "PipelineLoop" { if t.TaskRef.Kind == "PipelineLoop" {
tl, err := c.pipelineloopClientSet.CustomV1alpha1().PipelineLoops(customRun.Namespace).Get(ctx, t.TaskRef.Name, metav1.GetOptions{}) tl, err := c.pipelineLoopLister.PipelineLoops(customRun.Namespace).Get(t.TaskRef.Name)
if err == nil && tl != nil { if err == nil && tl != nil {
if len(tl.ObjectMeta.Annotations) == 0 { if len(tl.ObjectMeta.Annotations) == 0 {
tl.ObjectMeta.Annotations = map[string]string{MaxNestedStackDepthKey: fmt.Sprint(depth)} tl.ObjectMeta.Annotations = map[string]string{MaxNestedStackDepthKey: fmt.Sprint(depth)}
@ -360,12 +362,12 @@ func (c *Reconciler) setMaxNestedStackDepth(ctx context.Context, pipelineLoopSpe
} }
} }
func (c *Reconciler) reconcile(ctx context.Context, customRun *tektonv1beta1.CustomRun, status *pipelineloopv1alpha1.PipelineLoopRunStatus) error { func (c *Reconciler) reconcile(ctx context.Context, customRun *tektonv1beta1.CustomRun, status *pipelineloopv1alpha1.PipelineLoopRunStatus, firstIteration bool) error {
ctx = EnableCustomTaskFeatureFlag(ctx) ctx = EnableCustomTaskFeatureFlag(ctx)
logger := logging.FromContext(ctx) logger := logging.FromContext(ctx)
var hashSum string var hashSum string
// Get the PipelineLoop referenced by the CustomRun // Get the PipelineLoop referenced by the CustomRun
pipelineLoopMeta, pipelineLoopSpec, err := c.getPipelineLoop(ctx, customRun) pipelineLoopMeta, pipelineLoopSpec, err := c.getPipelineLoop(ctx, customRun, firstIteration)
if err != nil { if err != nil {
return nil return nil
} }
@ -431,18 +433,36 @@ func (c *Reconciler) reconcile(ctx context.Context, customRun *tektonv1beta1.Cus
} }
// CustomRun is cancelled, just cancel all the running instance and return // CustomRun is cancelled, just cancel all the running instance and return
if customRun.IsCancelled() { if customRun.IsCancelled() {
var DAGStatus pb.Execution_State
if len(failedPrs) > 0 { if len(failedPrs) > 0 {
customRun.Status.MarkCustomRunFailed(pipelineloopv1alpha1.PipelineLoopRunReasonFailed.String(), customRun.Status.MarkCustomRunFailed(pipelineloopv1alpha1.PipelineLoopRunReasonFailed.String(),
"CustomRun %s/%s was failed", "CustomRun %s/%s was failed",
customRun.Namespace, customRun.Name) customRun.Namespace, customRun.Name)
DAGStatus = pb.Execution_FAILED
} else { } else {
reason := pipelineloopv1alpha1.PipelineLoopRunReasonCancelled.String() reason := pipelineloopv1alpha1.PipelineLoopRunReasonCancelled.String()
if customRun.HasTimedOut(c.clock) { // This check is only possible if we are on tekton 0.27.0 + if customRun.HasTimedOut(c.clock) { // This check is only possible if we are on tekton 0.27.0 +
reason = string(tektonv1beta1.CustomRunReasonTimedOut) reason = string(tektonv1beta1.CustomRunReasonTimedOut)
} }
customRun.Status.MarkCustomRunFailed(reason, "CustomRun %s/%s was cancelled", customRun.Namespace, customRun.Name) customRun.Status.MarkCustomRunFailed(reason, "CustomRun %s/%s was cancelled", customRun.Namespace, customRun.Name)
DAGStatus = pb.Execution_CANCELED
}
if c.runKFPV2Driver == "true" {
options, err := kfptask.ParseParams(customRun)
if err != nil {
logger.Errorf("Run %s/%s is invalid because of %s", customRun.Namespace, customRun.Name, err)
customRun.Status.MarkCustomRunFailed(kfptask.ReasonFailedValidation,
"Run can't be run because it has an invalid param - %v", err)
return err
}
DAGErr := kfptask.UpdateDAGPublisher(ctx, options, DAGStatus)
if err != nil {
logger.Errorf("kfp publisher failed when reconciling Run %s/%s: %v", customRun.Namespace, customRun.Name, DAGErr)
customRun.Status.MarkCustomRunFailed(kfptask.ReasonDriverError,
"kfp publisher execution failed: %v", DAGErr)
return DAGErr
}
} }
for _, currentRunningPr := range currentRunningPrs { for _, currentRunningPr := range currentRunningPrs {
logger.Infof("CustomRun %s/%s is cancelled. Cancelling PipelineRun %s.", customRun.Namespace, customRun.Name, currentRunningPr.Name) logger.Infof("CustomRun %s/%s is cancelled. Cancelling PipelineRun %s.", customRun.Namespace, customRun.Name, currentRunningPr.Name)
if _, err := c.pipelineClientSet.TektonV1().PipelineRuns(customRun.Namespace).Patch(ctx, currentRunningPr.Name, types.JSONPatchType, cancelPatchBytes, metav1.PatchOptions{}); err != nil { if _, err := c.pipelineClientSet.TektonV1().PipelineRuns(customRun.Namespace).Patch(ctx, currentRunningPr.Name, types.JSONPatchType, cancelPatchBytes, metav1.PatchOptions{}); err != nil {
@ -599,16 +619,20 @@ func (c *Reconciler) reconcile(ctx context.Context, customRun *tektonv1beta1.Cus
return nil return nil
} }
func (c *Reconciler) getPipelineLoop(ctx context.Context, customRun *tektonv1beta1.CustomRun) (*metav1.ObjectMeta, *pipelineloopv1alpha1.PipelineLoopSpec, error) { func (c *Reconciler) getPipelineLoop(ctx context.Context, customRun *tektonv1beta1.CustomRun, firstIteration bool) (*metav1.ObjectMeta, *pipelineloopv1alpha1.PipelineLoopSpec, error) {
pipelineLoopMeta := metav1.ObjectMeta{} pipelineLoopMeta := metav1.ObjectMeta{}
pipelineLoopSpec := pipelineloopv1alpha1.PipelineLoopSpec{} pipelineLoopSpec := pipelineloopv1alpha1.PipelineLoopSpec{}
if customRun.Spec.CustomRef != nil && customRun.Spec.CustomRef.Name != "" { if customRun.Spec.CustomRef != nil && customRun.Spec.CustomRef.Name != "" {
// Use the k8 client to get the PipelineLoop rather than the lister. This avoids a timing issue where // Use the k8 client to get the PipelineLoop rather than the Lister on the first reconcile. This avoids a timing issue where
// the PipelineLoop is not yet in the lister cache if it is created at nearly the same time as the Run. // the PipelineLoop is not yet in the lister cache if it is created at nearly the same time as the Run.
// See https://github.com/tektoncd/pipeline/issues/2740 for discussion on this issue. // See https://github.com/tektoncd/pipeline/issues/2740 for discussion on this issue.
// var tl *pipelineloopv1alpha1.PipelineLoop
// tl, err := c.pipelineLoopLister.PipelineLoops(customRun.Namespace).Get(customRun.Spec.Ref.Name) var err error
tl, err := c.pipelineloopClientSet.CustomV1alpha1().PipelineLoops(customRun.Namespace).Get(ctx, customRun.Spec.CustomRef.Name, metav1.GetOptions{}) if firstIteration {
tl, err = c.pipelineloopClientSet.CustomV1alpha1().PipelineLoops(customRun.Namespace).Get(ctx, customRun.Spec.CustomRef.Name, metav1.GetOptions{})
} else {
tl, err = c.pipelineLoopLister.PipelineLoops(customRun.Namespace).Get(customRun.Spec.CustomRef.Name)
}
if err != nil { if err != nil {
customRun.Status.MarkCustomRunFailed(pipelineloopv1alpha1.PipelineLoopRunReasonCouldntGetPipelineLoop.String(), customRun.Status.MarkCustomRunFailed(pipelineloopv1alpha1.PipelineLoopRunReasonCouldntGetPipelineLoop.String(),
"Error retrieving PipelineLoop for CustomRun %s/%s: %s", "Error retrieving PipelineLoop for CustomRun %s/%s: %s",

View File

@ -1,34 +0,0 @@
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM golang:1.20.4-alpine3.17 as builder
WORKDIR /go/src/github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-driver
COPY . .
# Needed musl-dev for github.com/mattn/go-sqlite3
RUN apk update && apk upgrade && \
apk add --no-cache bash git openssh gcc musl-dev patch
RUN go mod vendor && patch -u vendor/k8s.io/klog/v2/klog.go pkg/controller/klog.patch
RUN CGO_ENABLED=0 GO111MODULE=on go build -mod=vendor -o /bin/controller cmd/controller/*.go && rm -rf vendor
FROM alpine:3.17
WORKDIR /bin
COPY --from=builder /bin/controller /bin/controller
RUN chmod +x /bin/controller
RUN apk --no-cache add tzdata
CMD /bin/controller

View File

@ -1 +0,0 @@
2.0.3

View File

@ -1,26 +0,0 @@
/*
// Copyright 2023 kubeflow.org
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
*/
package main
import (
"github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-driver/pkg/controller"
"knative.dev/pkg/injection/sharedmain"
)
func main() {
sharedmain.Main(controller.ControllerName, controller.NewController)
}

View File

@ -1,260 +0,0 @@
// Copyright 2021-2023 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
"github.com/kubeflow/pipelines/backend/src/v2/driver"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
"github.com/golang/glog"
"github.com/golang/protobuf/jsonpb"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/config"
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
)
const (
driverTypeArg = "type"
)
var (
// inputs
driverType = flag.String(driverTypeArg, "", "task driver type, one of ROOT_DAG, DAG, CONTAINER")
pipelineName = flag.String("pipeline_name", "", "pipeline context name")
runID = flag.String("run_id", "", "pipeline run uid")
componentSpecJson = flag.String("component", "{}", "component spec")
taskSpecJson = flag.String("task", "", "task spec")
runtimeConfigJson = flag.String("runtime_config", "", "jobruntime config")
iterationIndex = flag.Int("iteration_index", -1, "iteration index, -1 means not an interation")
// container inputs
dagExecutionID = flag.Int64("dag_execution_id", 0, "DAG execution ID")
containerSpecJson = flag.String("container", "{}", "container spec")
k8sExecConfigJson = flag.String("kubernetes_config", "{}", "kubernetes executor config")
// config
mlmdServerAddress = flag.String("mlmd_server_address", "", "MLMD server address")
mlmdServerPort = flag.String("mlmd_server_port", "", "MLMD server port")
// output paths
executionIDPath = flag.String("execution_id_path", "", "Exeucution ID output path")
iterationCountPath = flag.String("iteration_count_path", "", "Iteration Count output path")
podSpecPatchPath = flag.String("pod_spec_patch_path", "", "Pod Spec Patch output path")
// the value stored in the paths will be either 'true' or 'false'
cachedDecisionPath = flag.String("cached_decision_path", "", "Cached Decision output path")
conditionPath = flag.String("condition_path", "", "Condition output path")
)
// func RootDAG(pipelineName string, runID string, component *pipelinespec.ComponentSpec, task *pipelinespec.PipelineTaskSpec, mlmd *metadata.Client) (*Execution, error) {
func main() {
flag.Parse()
err := drive()
if err != nil {
glog.Exitf("%v", err)
}
}
// Use WARNING default logging level to facilitate troubleshooting.
func init() {
flag.Set("logtostderr", "true")
// Change the WARNING to INFO level for debugging.
flag.Set("stderrthreshold", "WARNING")
}
func validate() error {
if *driverType == "" {
return fmt.Errorf("argument --%s must be specified", driverTypeArg)
}
// validation responsibility lives in driver itself, so we do not validate all other args
return nil
}
func drive() (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("KFP driver: %w", err)
}
}()
ctx := context.Background()
if err = validate(); err != nil {
return err
}
glog.Infof("input ComponentSpec:%s\n", prettyPrint(*componentSpecJson))
componentSpec := &pipelinespec.ComponentSpec{}
if err := jsonpb.UnmarshalString(*componentSpecJson, componentSpec); err != nil {
return fmt.Errorf("failed to unmarshal component spec, error: %w\ncomponentSpec: %v", err, prettyPrint(*componentSpecJson))
}
var taskSpec *pipelinespec.PipelineTaskSpec
if *taskSpecJson != "" {
glog.Infof("input TaskSpec:%s\n", prettyPrint(*taskSpecJson))
taskSpec = &pipelinespec.PipelineTaskSpec{}
if err := jsonpb.UnmarshalString(*taskSpecJson, taskSpec); err != nil {
return fmt.Errorf("failed to unmarshal task spec, error: %w\ntask: %v", err, taskSpecJson)
}
}
glog.Infof("input ContainerSpec:%s\n", prettyPrint(*containerSpecJson))
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{}
if err := jsonpb.UnmarshalString(*containerSpecJson, containerSpec); err != nil {
return fmt.Errorf("failed to unmarshal container spec, error: %w\ncontainerSpec: %v", err, containerSpecJson)
}
var runtimeConfig *pipelinespec.PipelineJob_RuntimeConfig
if *runtimeConfigJson != "" {
glog.Infof("input RuntimeConfig:%s\n", prettyPrint(*runtimeConfigJson))
runtimeConfig = &pipelinespec.PipelineJob_RuntimeConfig{}
if err := jsonpb.UnmarshalString(*runtimeConfigJson, runtimeConfig); err != nil {
return fmt.Errorf("failed to unmarshal runtime config, error: %w\nruntimeConfig: %v", err, runtimeConfigJson)
}
}
var k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
if *k8sExecConfigJson != "" {
glog.Infof("input kubernetesConfig:%s\n", prettyPrint(*k8sExecConfigJson))
k8sExecCfg = &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(*k8sExecConfigJson, k8sExecCfg); err != nil {
return fmt.Errorf("failed to unmarshal Kubernetes config, error: %w\nKubernetesConfig: %v", err, k8sExecConfigJson)
}
}
namespace, err := config.InPodNamespace()
if err != nil {
return err
}
client, err := newMlmdClient()
if err != nil {
return err
}
cacheClient, err := cacheutils.NewClient()
if err != nil {
return err
}
options := driver.Options{
PipelineName: *pipelineName,
RunID: *runID,
Namespace: namespace,
Component: componentSpec,
Task: taskSpec,
DAGExecutionID: *dagExecutionID,
IterationIndex: *iterationIndex,
}
var execution *driver.Execution
var driverErr error
switch *driverType {
case "ROOT_DAG":
options.RuntimeConfig = runtimeConfig
execution, driverErr = driver.RootDAG(ctx, options, client)
case "DAG":
execution, driverErr = driver.DAG(ctx, options, client)
case "CONTAINER":
options.Container = containerSpec
options.KubernetesExecutorConfig = k8sExecCfg
execution, driverErr = driver.Container(ctx, options, client, cacheClient)
default:
err = fmt.Errorf("unknown driverType %s", *driverType)
}
if driverErr != nil {
if execution == nil {
return driverErr
}
defer func() {
// Override error with driver error, because driver error is more important.
// However, we continue running, because the following code prints debug info that
// may be helpful for figuring out why this failed.
err = driverErr
}()
}
if execution.ID != 0 {
glog.Infof("output execution.ID=%v", execution.ID)
if *executionIDPath != "" {
if err = writeFile(*executionIDPath, []byte(fmt.Sprint(execution.ID))); err != nil {
return fmt.Errorf("failed to write execution ID to file: %w", err)
}
}
}
if execution.IterationCount != nil {
if err = writeFile(*iterationCountPath, []byte(fmt.Sprintf("%v", *execution.IterationCount))); err != nil {
return fmt.Errorf("failed to write iteration count to file: %w", err)
}
}
if execution.Cached != nil {
if err = writeFile(*cachedDecisionPath, []byte(strconv.FormatBool(*execution.Cached))); err != nil {
return fmt.Errorf("failed to write cached decision to file: %w", err)
}
}
if execution.Condition != nil {
if err = writeFile(*conditionPath, []byte(strconv.FormatBool(*execution.Condition))); err != nil {
return fmt.Errorf("failed to write condition to file: %w", err)
}
}
if execution.PodSpecPatch != "" {
glog.Infof("output podSpecPatch=\n%s\n", execution.PodSpecPatch)
if *podSpecPatchPath == "" {
return fmt.Errorf("--pod_spec_patch_path is required for container executor drivers")
}
if err = writeFile(*podSpecPatchPath, []byte(execution.PodSpecPatch)); err != nil {
return fmt.Errorf("failed to write pod spec patch to file: %w", err)
}
}
if execution.ExecutorInput != nil {
marshaler := jsonpb.Marshaler{}
executorInputJSON, err := marshaler.MarshalToString(execution.ExecutorInput)
if err != nil {
return fmt.Errorf("failed to marshal ExecutorInput to JSON: %w", err)
}
glog.Infof("output ExecutorInput:%s\n", prettyPrint(executorInputJSON))
}
return nil
}
func prettyPrint(jsonStr string) string {
var prettyJSON bytes.Buffer
err := json.Indent(&prettyJSON, []byte(jsonStr), "", " ")
if err != nil {
return jsonStr
}
return prettyJSON.String()
}
func writeFile(path string, data []byte) (err error) {
if path == "" {
return fmt.Errorf("path is not specified")
}
defer func() {
if err != nil {
err = fmt.Errorf("failed to write to %s: %w", path, err)
}
}()
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return err
}
return ioutil.WriteFile(path, data, 0o644)
}
func newMlmdClient() (*metadata.Client, error) {
mlmdConfig := metadata.DefaultConfig()
if *mlmdServerAddress != "" && *mlmdServerPort != "" {
mlmdConfig.Address = *mlmdServerAddress
mlmdConfig.Port = *mlmdServerPort
}
return metadata.NewClient(mlmdConfig.Address, mlmdConfig.Port)
}

View File

@ -1,123 +0,0 @@
module github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-driver
require (
github.com/golang/glog v1.1.0
github.com/golang/protobuf v1.5.3
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03
github.com/tektoncd/pipeline v0.50.2
k8s.io/api v0.27.1
k8s.io/client-go v0.27.2
knative.dev/pkg v0.0.0-20231011201526-df28feae6d34
)
require (
cloud.google.com/go v0.110.2 // indirect
cloud.google.com/go/compute v1.19.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.0 // indirect
cloud.google.com/go/storage v1.29.0 // indirect
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20220418222510-f25a4f6275ed // indirect
github.com/aws/aws-sdk-go v1.42.50 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/cel-go v0.12.5 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-containerregistry v0.15.2 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/wire v0.4.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20230810215105-e1f0c010f800 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.13.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/automaxprocs v1.4.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.24.0 // indirect
gocloud.dev v0.22.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.9.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/api v0.128.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/grpc v1.56.3 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.25.4 // indirect
k8s.io/apimachinery v0.27.2 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect
k8s.io/utils v0.0.0-20230505201702-9f6742963106 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
replace (
k8s.io/api => k8s.io/api v0.25.9
k8s.io/apimachinery => k8s.io/apimachinery v0.26.5
k8s.io/client-go => k8s.io/client-go v0.25.9
k8s.io/code-generator => k8s.io/code-generator v0.25.9
k8s.io/kubernetes => k8s.io/kubernetes v1.11.1
sigs.k8s.io/controller-tools => sigs.k8s.io/controller-tools v0.2.9
)
go 1.19

File diff suppressed because it is too large Load Diff

View File

@ -1,43 +0,0 @@
package controller
import (
context "context"
runInformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/customrun"
customrun "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
tkncontroller "github.com/tektoncd/pipeline/pkg/controller"
"k8s.io/client-go/tools/cache"
configmap "knative.dev/pkg/configmap"
controller "knative.dev/pkg/controller"
logging "knative.dev/pkg/logging"
)
const (
ControllerName = "kfp-driver"
apiVersion = "kfp-driver.tekton.dev/v1alpha1"
kind = "KFPDriver"
)
// NewController creates a Reconciler for Run and returns the result of NewImpl.
func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
runInformer := runInformer.Get(ctx)
r := &Reconciler{}
impl := customrun.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
AgentName: ControllerName,
}
})
logger.Info("Setting up event handlers")
runInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: tkncontroller.FilterCustomRunRef(apiVersion, kind),
Handler: controller.HandleAll(impl.Enqueue),
})
return impl
}

View File

@ -1,41 +0,0 @@
--- klog.go 2023-02-14 13:31:28.209488578 -0800
+++ vendor/k8s.io/klog/v2/klog.go 2023-02-14 13:33:28.081570621 -0800
@@ -401,25 +401,25 @@
// init sets up the defaults and creates command line flags.
func init() {
- commandLine.StringVar(&logging.logDir, "log_dir", "", "If non-empty, write log files in this directory (no effect when -logtostderr=true)")
- commandLine.StringVar(&logging.logFile, "log_file", "", "If non-empty, use this log file (no effect when -logtostderr=true)")
- commandLine.Uint64Var(&logging.logFileMaxSizeMB, "log_file_max_size", 1800,
+ commandLine.StringVar(&logging.logDir, "klog_dir", "", "If non-empty, write log files in this directory (no effect when -logtostderr=true)")
+ commandLine.StringVar(&logging.logFile, "klog_file", "", "If non-empty, use this log file (no effect when -logtostderr=true)")
+ commandLine.Uint64Var(&logging.logFileMaxSizeMB, "klog_file_max_size", 1800,
"Defines the maximum size a log file can grow to (no effect when -logtostderr=true). Unit is megabytes. "+
"If the value is 0, the maximum file size is unlimited.")
- commandLine.BoolVar(&logging.toStderr, "logtostderr", true, "log to standard error instead of files")
- commandLine.BoolVar(&logging.alsoToStderr, "alsologtostderr", false, "log to standard error as well as files (no effect when -logtostderr=true)")
+ commandLine.BoolVar(&logging.toStderr, "klog_tostderr", true, "log to standard error instead of files")
+ commandLine.BoolVar(&logging.alsoToStderr, "klog_alsologtostderr", false, "log to standard error as well as files (no effect when -logtostderr=true)")
logging.setVState(0, nil, false)
- commandLine.Var(&logging.verbosity, "v", "number for the log level verbosity")
- commandLine.BoolVar(&logging.addDirHeader, "add_dir_header", false, "If true, adds the file directory to the header of the log messages")
- commandLine.BoolVar(&logging.skipHeaders, "skip_headers", false, "If true, avoid header prefixes in the log messages")
- commandLine.BoolVar(&logging.oneOutput, "one_output", false, "If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)")
- commandLine.BoolVar(&logging.skipLogHeaders, "skip_log_headers", false, "If true, avoid headers when opening log files (no effect when -logtostderr=true)")
+ commandLine.Var(&logging.verbosity, "klog_v", "number for the log level verbosity")
+ commandLine.BoolVar(&logging.addDirHeader, "klog_add_dir_header", false, "If true, adds the file directory to the header of the log messages")
+ commandLine.BoolVar(&logging.skipHeaders, "klog_skip_headers", false, "If true, avoid header prefixes in the log messages")
+ commandLine.BoolVar(&logging.oneOutput, "klog_one_output", false, "If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)")
+ commandLine.BoolVar(&logging.skipLogHeaders, "klog_skip_log_headers", false, "If true, avoid headers when opening log files (no effect when -logtostderr=true)")
logging.stderrThreshold = severityValue{
Severity: severity.ErrorLog, // Default stderrThreshold is ERROR.
}
- commandLine.Var(&logging.stderrThreshold, "stderrthreshold", "logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=false)")
- commandLine.Var(&logging.vmodule, "vmodule", "comma-separated list of pattern=N settings for file-filtered logging")
- commandLine.Var(&logging.traceLocation, "log_backtrace_at", "when logging hits line file:N, emit a stack trace")
+ commandLine.Var(&logging.stderrThreshold, "klog_stderrthreshold", "logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=false)")
+ commandLine.Var(&logging.vmodule, "klog_vmodule", "comma-separated list of pattern=N settings for file-filtered logging")
+ commandLine.Var(&logging.traceLocation, "klog_backtrace_at", "when logging hits line file:N, emit a stack trace")
logging.settings.contextualLoggingEnabled = true
logging.flushD = newFlushDaemon(logging.lockAndFlushAll, nil)

View File

@ -1,323 +0,0 @@
package controller
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strconv"
"github.com/golang/protobuf/jsonpb"
v1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
v1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
reconciler "knative.dev/pkg/reconciler"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
"github.com/kubeflow/pipelines/backend/src/v2/driver"
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
)
const (
// ReasonFailedValidation indicates that the reason for failure status is that Run failed runtime validation
ReasonFailedValidation = "RunValidationFailed"
// ReasonDriverError indicates that an error is throw while running the driver
ReasonDriverError = "DriverError"
// ReasonDriverSuccess indicates that driver finished successfully
ReasonDriverSuccess = "DriverSuccess"
ExecutionID = "execution-id"
ExecutorInput = "executor-input"
CacheDecision = "cached-decision"
Condition = "condition"
IterationCount = "iteration-count"
PodSpecPatch = "pod-spec-patch"
)
// newReconciledNormal makes a new reconciler event with event type Normal, and reason RunReconciled.
func newReconciledNormal(namespace, name string) reconciler.Event {
return reconciler.NewEvent(v1.EventTypeNormal, "RunReconciled", "Run reconciled: \"%s/%s\"", namespace, name)
}
// Reconciler implements controller.Reconciler for Run resources.
type Reconciler struct {
}
type driverOptions struct {
driverType string
options driver.Options
mlmdClient *metadata.Client
cacheClient *cacheutils.Client
}
// Check that our Reconciler implements Interface
var _ customrun.Interface = (*Reconciler)(nil)
// ReconcileKind implements Interface.ReconcileKind.
func (r *Reconciler) ReconcileKind(ctx context.Context, run *v1beta1.CustomRun) reconciler.Event {
logger := logging.FromContext(ctx)
logger.Infof("Reconciling Run %s/%s", run.Namespace, run.Name)
// If the Run has not started, initialize the Condition and set the start time.
if !run.HasStarted() {
logger.Infof("Starting new Run %s/%s", run.Namespace, run.Name)
run.Status.InitializeConditions()
// In case node time was not synchronized, when controller has been scheduled to other nodes.
if run.Status.StartTime.Sub(run.CreationTimestamp.Time) < 0 {
logger.Warnf("Run %s/%s createTimestamp %s is after the Run started %s", run.Namespace, run.Name, run.CreationTimestamp, run.Status.StartTime)
run.Status.StartTime = &run.CreationTimestamp
}
}
if run.IsDone() {
logger.Infof("Run %s/%s is done", run.Namespace, run.Name)
return nil
}
options, err := parseParams(run)
if err != nil {
logger.Errorf("Run %s/%s is invalid because of %s", run.Namespace, run.Name, err)
run.Status.MarkCustomRunFailed(ReasonFailedValidation,
"Run can't be run because it has an invalid param - %v", err)
return nil
}
runResults, driverErr := execDriver(ctx, options)
if driverErr != nil {
logger.Errorf("kfp-driver execution failed when reconciling Run %s/%s: %v", run.Namespace, run.Name, driverErr)
run.Status.MarkCustomRunFailed(ReasonDriverError,
"kfp-driver execution failed: %v", driverErr)
return nil
}
run.Status.Results = append(run.Status.Results, *runResults...)
run.Status.MarkCustomRunSucceeded(ReasonDriverSuccess, "kfp-driver finished successfully")
return newReconciledNormal(run.Namespace, run.Name)
}
func execDriver(ctx context.Context, options *driverOptions) (*[]v1beta1.CustomRunResult, error) {
var execution *driver.Execution
var err error
logger := logging.FromContext(ctx)
switch options.driverType {
case "ROOT_DAG":
execution, err = driver.RootDAG(ctx, options.options, options.mlmdClient)
case "CONTAINER":
execution, err = driver.Container(ctx, options.options, options.mlmdClient, options.cacheClient)
case "DAG":
execution, err = driver.DAG(ctx, options.options, options.mlmdClient)
case "DAG-PUB":
// no-op for now
return &[]v1beta1.CustomRunResult{}, nil
default:
err = fmt.Errorf("unknown driverType %s", options.driverType)
}
if err != nil {
return nil, err
}
var runResults []v1beta1.CustomRunResult
if execution.ID != 0 {
logger.Infof("output execution.ID=%v", execution.ID)
runResults = append(runResults, v1beta1.CustomRunResult{
Name: ExecutionID,
Value: fmt.Sprint(execution.ID),
})
}
if execution.IterationCount != nil {
count := *execution.IterationCount
// the count would be use as 'to' in PipelineLoop. since PipelineLoop's numberic iteration includes to,
// need to substract 1 to compensate that.
count = count - 1
if count < 0 {
count = 0
}
logger.Infof("output execution.IterationCount=%v, count:=%v", *execution.IterationCount, count)
runResults = append(runResults, v1beta1.CustomRunResult{
Name: IterationCount,
Value: fmt.Sprint(count),
})
}
logger.Infof("output execution.Condition=%v", execution.Condition)
if execution.Condition == nil {
runResults = append(runResults, v1beta1.CustomRunResult{
Name: Condition,
Value: "",
})
} else {
runResults = append(runResults, v1beta1.CustomRunResult{
Name: Condition,
Value: strconv.FormatBool(*execution.Condition),
})
}
if execution.ExecutorInput != nil {
marshaler := jsonpb.Marshaler{}
executorInputJSON, err := marshaler.MarshalToString(execution.ExecutorInput)
if err != nil {
return nil, fmt.Errorf("failed to marshal ExecutorInput to JSON: %w", err)
}
logger.Infof("output ExecutorInput:%s\n", prettyPrint(executorInputJSON))
runResults = append(runResults, v1beta1.CustomRunResult{
Name: ExecutorInput,
Value: fmt.Sprint(executorInputJSON),
})
}
// seems no need to handle the PodSpecPatch
if execution.Cached != nil {
runResults = append(runResults, v1beta1.CustomRunResult{
Name: CacheDecision,
Value: strconv.FormatBool(*execution.Cached),
})
}
if options.driverType == "CONTAINER" {
runResults = append(runResults, v1beta1.CustomRunResult{
Name: PodSpecPatch,
Value: execution.PodSpecPatch,
})
}
return &runResults, nil
}
func prettyPrint(jsonStr string) string {
var prettyJSON bytes.Buffer
err := json.Indent(&prettyJSON, []byte(jsonStr), "", " ")
if err != nil {
return jsonStr
}
return prettyJSON.String()
}
func parseParams(run *v1beta1.CustomRun) (*driverOptions, *apis.FieldError) {
if len(run.Spec.Params) == 0 {
return nil, apis.ErrMissingField("params")
}
opts := &driverOptions{
driverType: "",
}
opts.options.Namespace = run.Namespace
mlmdOpts := metadata.ServerConfig{
Address: "metadata-grpc-service.kubeflow.svc.cluster.local",
Port: "8080",
}
for _, param := range run.Spec.Params {
switch param.Name {
case "type":
opts.driverType = param.Value.StringVal
case "pipeline_name":
opts.options.PipelineName = param.Value.StringVal
case "run_id":
opts.options.RunID = param.Value.StringVal
case "component":
if param.Value.StringVal != "" {
componentSpec := &pipelinespec.ComponentSpec{}
if err := jsonpb.UnmarshalString(param.Value.StringVal, componentSpec); err != nil {
return nil, apis.ErrInvalidValue(
fmt.Sprintf("failed to unmarshal component spec, error: %v\ncomponentSpec: %v", err, param.Value.StringVal),
"component",
)
}
opts.options.Component = componentSpec
}
case "task":
if param.Value.StringVal != "" {
taskSpec := &pipelinespec.PipelineTaskSpec{}
if err := jsonpb.UnmarshalString(param.Value.StringVal, taskSpec); err != nil {
return nil, apis.ErrInvalidValue(
fmt.Sprintf("failed to unmarshal task spec, error: %v\ntask: %v", err, param.Value.StringVal),
"task",
)
}
opts.options.Task = taskSpec
}
case "runtime_config":
if param.Value.StringVal != "" {
runtimeConfig := &pipelinespec.PipelineJob_RuntimeConfig{}
if err := jsonpb.UnmarshalString(param.Value.StringVal, runtimeConfig); err != nil {
return nil, apis.ErrInvalidValue(
fmt.Sprintf("failed to unmarshal runtime config, error: %v\nruntimeConfig: %v", err, param.Value.StringVal),
"runtime-config",
)
}
opts.options.RuntimeConfig = runtimeConfig
}
case "container":
if param.Value.StringVal != "" {
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{}
if err := jsonpb.UnmarshalString(param.Value.StringVal, containerSpec); err != nil {
return nil, apis.ErrInvalidValue(
fmt.Sprintf("failed to unmarshal container spec, error: %v\ncontainerSpec: %v", err, param.Value.StringVal),
"container",
)
}
opts.options.Container = containerSpec
}
case "iteration_index":
if param.Value.StringVal != "" {
tmp, _ := strconv.ParseInt(param.Value.StringVal, 10, 32)
opts.options.IterationIndex = int(tmp)
} else {
opts.options.IterationIndex = -1
}
case "dag_execution_id":
if param.Value.StringVal != "" {
opts.options.DAGExecutionID, _ = strconv.ParseInt(param.Value.StringVal, 10, 64)
}
case "mlmd_server_address":
mlmdOpts.Address = param.Value.StringVal
case "mlmd_server_port":
mlmdOpts.Port = param.Value.StringVal
case "kubernetes_config":
var k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
if param.Value.StringVal != "" {
k8sExecCfg = &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(param.Value.StringVal, k8sExecCfg); err != nil {
return nil, apis.ErrInvalidValue(
fmt.Sprintf("failed to unmarshal Kubernetes config, error: %v\nKubernetesConfig: %v", err, param.Value.StringVal),
"kubernetes_config",
)
}
opts.options.KubernetesExecutorConfig = k8sExecCfg
}
}
}
//Check all options
if opts.driverType == "" {
return nil, apis.ErrMissingField("type")
}
if opts.options.RunID == "" {
return nil, apis.ErrMissingField("run-id")
}
if opts.driverType == "ROOT_DAG" && opts.options.RuntimeConfig == nil {
return nil, apis.ErrMissingField("runtime-config")
}
client, err := metadata.NewClient(mlmdOpts.Address, mlmdOpts.Port)
if err != nil {
return nil, apis.ErrGeneric(fmt.Sprintf("can't estibalish MLMD connection: %v", err))
}
opts.mlmdClient = client
cacheClient, err := cacheutils.NewClient()
if err != nil {
return nil, apis.ErrGeneric(fmt.Sprintf("can't estibalish cache connection: %v", err))
}
opts.cacheClient = cacheClient
return opts, nil
}

View File

@ -1 +1 @@
2.0.3 2.1.0

View File

@ -1 +1 @@
2.0.3 2.1.0

View File

@ -3,15 +3,15 @@ module github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask
require ( require (
github.com/golang/protobuf v1.5.3 github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.3.1 github.com/google/uuid v1.3.1
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03 github.com/kubeflow/pipelines v0.0.0-20240416215826-da804407ad31
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03 github.com/kubeflow/pipelines/api v0.0.0-20240416215826-da804407ad31
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03 github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240416215826-da804407ad31
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20231027040853-58ce09e07d03 github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20240416215826-da804407ad31
github.com/stretchr/testify v1.8.4 github.com/stretchr/testify v1.8.4
github.com/tektoncd/pipeline v0.53.2 github.com/tektoncd/pipeline v0.53.2
go.uber.org/zap v1.26.0 go.uber.org/zap v1.26.0
google.golang.org/protobuf v1.31.0 google.golang.org/protobuf v1.31.0
k8s.io/api v0.27.1 k8s.io/api v0.27.2
k8s.io/apimachinery v0.27.3 k8s.io/apimachinery v0.27.3
k8s.io/client-go v0.27.2 k8s.io/client-go v0.27.2
k8s.io/utils v0.0.0-20230505201702-9f6742963106 k8s.io/utils v0.0.0-20230505201702-9f6742963106
@ -27,7 +27,7 @@ require (
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
github.com/aws/aws-sdk-go v1.42.50 // indirect github.com/aws/aws-sdk-go v1.45.25 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect github.com/blang/semver/v4 v4.0.0 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect github.com/blendle/zapdriver v1.3.1 // indirect
@ -37,7 +37,7 @@ require (
github.com/emicklei/go-restful/v3 v3.10.2 // indirect github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-kit/log v0.2.0 // indirect github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect
@ -45,7 +45,7 @@ require (
github.com/go-openapi/swag v0.22.3 // indirect github.com/go-openapi/swag v0.22.3 // indirect
github.com/gobuffalo/flect v0.2.4 // indirect github.com/gobuffalo/flect v0.2.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.1.0 // indirect github.com/golang/glog v1.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/cel-go v0.12.6 // indirect github.com/google/cel-go v0.12.6 // indirect
github.com/google/gnostic v0.6.9 // indirect github.com/google/gnostic v0.6.9 // indirect
@ -75,11 +75,12 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect
go.opencensus.io v0.24.0 // indirect go.opencensus.io v0.24.0 // indirect
@ -89,7 +90,7 @@ require (
gocloud.dev v0.22.0 // indirect gocloud.dev v0.22.0 // indirect
golang.org/x/crypto v0.17.0 // indirect golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
golang.org/x/net v0.17.0 // indirect golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.15.0 // indirect golang.org/x/sys v0.15.0 // indirect
@ -107,7 +108,7 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.26.5 // indirect k8s.io/apiextensions-apiserver v0.27.2 // indirect
k8s.io/klog/v2 v2.100.1 // indirect k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect

File diff suppressed because it is too large Load Diff

View File

@ -295,6 +295,10 @@ func ParseParams(run *tektonv1beta1.CustomRun) (*driverOptions, *apis.FieldError
return opts, nil return opts, nil
} }
func GetKubernetesExecutorConfig(options *driverOptions) *kubernetesplatform.KubernetesExecutorConfig {
return options.options.KubernetesExecutorConfig
}
func prettyPrint(jsonStr string) string { func prettyPrint(jsonStr string) string {
var prettyJSON bytes.Buffer var prettyJSON bytes.Buffer
err := json.Indent(&prettyJSON, []byte(jsonStr), "", " ") err := json.Indent(&prettyJSON, []byte(jsonStr), "", " ")

View File

@ -27,6 +27,7 @@ import (
kfptaskClient "github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/client/clientset/versioned" kfptaskClient "github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/client/clientset/versioned"
kfptaskListers "github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/client/listers/kfptask/v1alpha1" kfptaskListers "github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/client/listers/kfptask/v1alpha1"
"github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/common" "github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/common"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
"github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
@ -135,13 +136,13 @@ var annotationToDrop = map[string]string{
} }
// transite to next state based on current state // transite to next state based on current state
func (kts *kfptaskFS) next(executionID string, executorInput string, podSpecPatch string) error { func (kts *kfptaskFS) next(executionID string, executorInput string, podSpecPatch string, executorConfig *kubernetesplatform.KubernetesExecutorConfig) error {
kts.logger.Infof("kts state is %s", kts.state) kts.logger.Infof("kts state is %s", kts.state)
switch kts.state { switch kts.state {
case StateInit: case StateInit:
// create the corresponding TaskRun CRD and start the task // create the corresponding TaskRun CRD and start the task
// compose TaskRun // compose TaskRun
tr, err := kts.constructTaskRun(executionID, executorInput, podSpecPatch) tr, err := kts.constructTaskRun(executionID, executorInput, podSpecPatch, executorConfig)
if err != nil { if err != nil {
kts.logger.Infof("Failed to construct a TaskRun:%v", err) kts.logger.Infof("Failed to construct a TaskRun:%v", err)
kts.run.Status.MarkCustomRunFailed(kfptaskv1alpha1.KfpTaskRunReasonInternalError.String(), "Failed to construct a TaskRun: %v", err) kts.run.Status.MarkCustomRunFailed(kfptaskv1alpha1.KfpTaskRunReasonInternalError.String(), "Failed to construct a TaskRun: %v", err)
@ -196,7 +197,47 @@ func (kts *kfptaskFS) next(executionID string, executorInput string, podSpecPatc
return nil return nil
} }
func (kts *kfptaskFS) constructTaskRun(executionID string, executorInput string, podSpecPatch string) (*tektonv1.TaskRun, error) { // Extends the PodMetadata to include Kubernetes-specific executor config.
// Although the current podMetadata object is always empty, this function
// doesn't overwrite the existing podMetadata because for security reasons
// the existing podMetadata should have higher privilege than the user definition.
func extendPodMetadata(
podMetadata *metav1.ObjectMeta,
kubernetesExecutorConfig *kubernetesplatform.KubernetesExecutorConfig,
) {
// Get pod metadata information
if kubernetesExecutorConfig.GetPodMetadata() != nil {
if kubernetesExecutorConfig.GetPodMetadata().GetLabels() != nil {
if podMetadata.Labels == nil {
podMetadata.Labels = kubernetesExecutorConfig.GetPodMetadata().GetLabels()
} else {
podMetadata.Labels = extendMetadataMap(podMetadata.Labels, kubernetesExecutorConfig.GetPodMetadata().GetLabels())
}
}
if kubernetesExecutorConfig.GetPodMetadata().GetAnnotations() != nil {
if podMetadata.Annotations == nil {
podMetadata.Annotations = kubernetesExecutorConfig.GetPodMetadata().GetAnnotations()
} else {
podMetadata.Annotations = extendMetadataMap(podMetadata.Annotations, kubernetesExecutorConfig.GetPodMetadata().GetAnnotations())
}
}
}
}
// Extends metadata map values, highPriorityMap should overwrites lowPriorityMap values
// The original Map inputs should have higher priority since its defined by admin
// TODO: Use maps.Copy after moving to go 1.21+
func extendMetadataMap(
highPriorityMap map[string]string,
lowPriorityMap map[string]string,
) map[string]string {
for k, v := range highPriorityMap {
lowPriorityMap[k] = v
}
return lowPriorityMap
}
func (kts *kfptaskFS) constructTaskRun(executionID string, executorInput string, podSpecPatch string, executorConfig *kubernetesplatform.KubernetesExecutorConfig) (*tektonv1.TaskRun, error) {
ktSpec, err := kts.reconciler.getKfpTaskSpec(kts.ctx, kts.run) ktSpec, err := kts.reconciler.getKfpTaskSpec(kts.ctx, kts.run)
if err != nil { if err != nil {
return nil, err return nil, err
@ -235,6 +276,10 @@ func (kts *kfptaskFS) constructTaskRun(executionID string, executorInput string,
}, },
} }
if executorConfig != nil {
extendPodMetadata(&tr.ObjectMeta, executorConfig)
}
if podSpecPatch != "" { if podSpecPatch != "" {
podSpec, err := parseTaskSpecPatch(podSpecPatch) podSpec, err := parseTaskSpecPatch(podSpecPatch)
if err != nil { if err != nil {
@ -312,7 +357,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, run *tektonv1beta1.Custo
return nil return nil
} }
if ktstate.isRunning() { if ktstate.isRunning() {
return ktstate.next("", "", "") return ktstate.next("", "", "", nil)
} }
options, err := common.ParseParams(run) options, err := common.ParseParams(run)
if err != nil { if err != nil {
@ -321,6 +366,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, run *tektonv1beta1.Custo
"Run can't be run because it has an invalid param - %v", err) "Run can't be run because it has an invalid param - %v", err)
return nil return nil
} }
executorConfig := common.GetKubernetesExecutorConfig(options)
runResults, runTask, executionID, executorInput, podSpecPatch, driverErr := common.ExecDriver(ctx, options) runResults, runTask, executionID, executorInput, podSpecPatch, driverErr := common.ExecDriver(ctx, options)
if driverErr != nil { if driverErr != nil {
@ -341,7 +387,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, run *tektonv1beta1.Custo
return nil return nil
} }
return ktstate.next(executionID, executorInput, podSpecPatch) return ktstate.next(executionID, executorInput, podSpecPatch, executorConfig)
} }
func (r *Reconciler) FinalizeKind(ctx context.Context, run *tektonv1beta1.CustomRun) reconciler.Event { func (r *Reconciler) FinalizeKind(ctx context.Context, run *tektonv1beta1.CustomRun) reconciler.Event {