Compare commits
17 Commits
v1.9.2-sdk
...
master
Author | SHA1 | Date |
---|---|---|
|
0b89419544 | |
|
3e7950ffd3 | |
|
bb47bcd892 | |
|
c0d25310d5 | |
|
b49f959db9 | |
|
16e781dce9 | |
|
803377e899 | |
|
db6d85ece6 | |
|
9f568f2a72 | |
|
a9d7df96d2 | |
|
b77e6f38d5 | |
|
bb06e5e721 | |
|
550a827b05 | |
|
d5fc9fd5c9 | |
|
a71ba164ad | |
|
ff8bb50dc4 | |
|
08e438099a |
|
@ -9,14 +9,13 @@ on:
|
|||
|
||||
env:
|
||||
GITHUB_ACTION: "true"
|
||||
SETUPTOOLS_USE_DISTUTILS: "stdlib"
|
||||
|
||||
jobs:
|
||||
python-unittest:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ['3.8', '3.9', '3.10', '3.11']
|
||||
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12']
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
|
|
|
@ -66,6 +66,7 @@ spec:
|
|||
command: ["/bin/bash", "-c"]
|
||||
args:
|
||||
- set -ex;
|
||||
rm -r /artifacts/*;
|
||||
cd /artifacts && git clone -q -b $GIT_BRANCH $GIT_URL .;
|
||||
GIT_COMMIT=$(git rev-parse HEAD);
|
||||
source ./scripts/deploy/iks/run-test.sh;
|
||||
|
|
|
@ -9,7 +9,7 @@ just a few small guidelines you need to follow.
|
|||
<!-- START of ToC generated by running ./tools/mdtoc.sh CONTRIBUTING.md -->
|
||||
|
||||
- [Project Structure](#project-structure)
|
||||
- [Contributor License Agreement](#contributor-license-agreement)
|
||||
- [Legal](#legal)
|
||||
- [Coding Style](#coding-style)
|
||||
- [Unit Testing Best Practices](#unit-testing-best-practices)
|
||||
- [Golang](#golang)
|
||||
|
@ -35,17 +35,11 @@ To get started, see the development guides:
|
|||
* [Backend development guide](./backend/README.md)
|
||||
* [SDK development guide](./sdk/python/README.md)
|
||||
|
||||
## Contributor License Agreement
|
||||
## Legal
|
||||
|
||||
Contributions to this project must be accompanied by a Contributor License
|
||||
Agreement. You (or your employer) retain the copyright to your contribution;
|
||||
this simply gives us permission to use and redistribute your contributions as
|
||||
part of the project. Head over to <https://cla.developers.google.com/> to see
|
||||
your current agreements on file or to sign a new one.
|
||||
Kubeflow uses Developer Certificate of Origin ([DCO](https://github.com/apps/dco/)).
|
||||
|
||||
You generally only need to submit a CLA once, so if you've already submitted one
|
||||
(even if it was for a different project), you probably don't need to do it
|
||||
again.
|
||||
Please see https://github.com/kubeflow/community/tree/master/dco-signoff-hook#signing-off-commits to learn how to sign off your commits.
|
||||
|
||||
## Coding Style
|
||||
|
||||
|
|
2
Makefile
2
Makefile
|
@ -16,7 +16,7 @@
|
|||
# - The help target was derived from https://stackoverflow.com/a/35730328/5601796
|
||||
|
||||
VENV ?= .venv
|
||||
KFP_TEKTON_RELEASE ?= v1.9.1
|
||||
KFP_TEKTON_RELEASE ?= v1.9.2
|
||||
export VIRTUAL_ENV := $(abspath ${VENV})
|
||||
export PATH := ${VIRTUAL_ENV}/bin:${PATH}
|
||||
DOCKER_REGISTRY ?= aipipeline
|
||||
|
|
1
OWNERS
1
OWNERS
|
@ -6,6 +6,7 @@ approvers:
|
|||
- pugangxa
|
||||
- scrapcodes
|
||||
- yhwang
|
||||
- rafalbigaj
|
||||
reviewers:
|
||||
- ckadner
|
||||
- Tomcli
|
||||
|
|
|
@ -11,8 +11,6 @@ according to this [design doc](http://bit.ly/kfp-tekton). The current code allow
|
|||
|
||||
For more details about the project please follow this detailed [blog post](https://developer.ibm.com/blogs/awb-tekton-optimizations-for-kubeflow-pipelines-2-0) . For the latest KFP-Tekton V2 implementation and [supported offerings](https://developer.ibm.com/articles/advance-machine-learning-workflows-with-ibm-watson-pipelines/), please follow our latest [Kubecon Talk](https://www.youtube.com/watch?v=ecx-yp4g7YU) and [slides](https://docs.google.com/presentation/d/1Su42ApXzZvVwhNSYRAk3bd0heHOtrdEX/edit?usp=sharing&ouid=103716780892927252554&rtpof=true&sd=true). For information on the KFP-Tekton V1 implementation, look at these [slides](https://www.slideshare.net/AnimeshSingh/kubeflow-pipelines-with-tekton-236769976) as well as this [deep dive presentation](https://www.youtube.com/watch?v=AYIeNtXLT_k) for demos.
|
||||
|
||||
**Note**: If you are interested in a sister project built on top of Kubeflow Pipelines with Tekton, please try [Machine Learning eXchange (MLX)](https://github.com/machine-learning-exchange), Data and AI Assets Catalog and Execution Engine. It introduces a 'Component Registry' for Kubeflow Pipelines, amongst other things.
|
||||
|
||||
## Architecture
|
||||
|
||||
We are currently using [Kubeflow Pipelines 1.8.4](https://github.com/kubeflow/pipelines/releases/tag/1.8.4) and
|
||||
|
|
|
@ -1,3 +1,17 @@
|
|||
// Copyright 2024 The Kubeflow Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package template
|
||||
|
||||
import (
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
// Copyright 2024 The Kubeflow Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package template
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
func TestGetConfigMapVolumeSource(t *testing.T) {
|
||||
t.Run("Returns correct volume source", func(t *testing.T) {
|
||||
// Create a new instance of Tekton
|
||||
tekton := &Tekton{}
|
||||
|
||||
// Define the expected volume source
|
||||
expectedVolumeSource := corev1.Volume{
|
||||
Name: "testVolume",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
ConfigMap: &corev1.ConfigMapVolumeSource{
|
||||
LocalObjectReference: corev1.LocalObjectReference{
|
||||
Name: "testConfigMap",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Call the getConfigMapVolumeSource method
|
||||
volumeSource := tekton.getConfigMapVolumeSource("testVolume", "testConfigMap")
|
||||
|
||||
// Assert that the returned volume source matches the expected volume source
|
||||
assert.Equal(t, expectedVolumeSource, volumeSource)
|
||||
})
|
||||
}
|
||||
func TestGetHostPathVolumeSource(t *testing.T) {
|
||||
t.Run("Returns correct volume source", func(t *testing.T) {
|
||||
// Create a new instance of Tekton
|
||||
tekton := &Tekton{}
|
||||
|
||||
// Define the expected volume source
|
||||
expectedVolumeSource := corev1.Volume{
|
||||
Name: "testVolume",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
HostPath: &corev1.HostPathVolumeSource{
|
||||
Path: "testPath",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Call the getHostPathVolumeSource method
|
||||
volumeSource := tekton.getHostPathVolumeSource("testVolume", "testPath")
|
||||
|
||||
// Assert that the returned volume source matches the expected volume source
|
||||
assert.Equal(t, expectedVolumeSource, volumeSource)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetEnvVar(t *testing.T) {
|
||||
t.Run("Returns correct environment variable", func(t *testing.T) {
|
||||
// Create a new instance of Tekton
|
||||
tekton := &Tekton{}
|
||||
|
||||
// Define the input parameters
|
||||
name := "testName"
|
||||
value := "testValue"
|
||||
|
||||
// Call the getEnvVar method
|
||||
envVar := tekton.getEnvVar(name, value)
|
||||
|
||||
// Assert that the returned environment variable has the correct name and value
|
||||
assert.Equal(t, name, envVar.Name)
|
||||
assert.Equal(t, value, envVar.Value)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetSecretKeySelector(t *testing.T) {
|
||||
t.Run("Returns correct environment variable", func(t *testing.T) {
|
||||
// Create a new instance of Tekton
|
||||
tekton := &Tekton{}
|
||||
|
||||
// Define the input parameters
|
||||
name := "testName"
|
||||
objectName := "testObjectName"
|
||||
objectKey := "testObjectKey"
|
||||
|
||||
// Call the getSecretKeySelector method
|
||||
envVar := tekton.getSecretKeySelector(name, objectName, objectKey)
|
||||
|
||||
// Assert that the returned environment variable has the correct name
|
||||
assert.Equal(t, name, envVar.Name)
|
||||
|
||||
// Assert that the returned environment variable has the correct SecretKeyRef
|
||||
assert.NotNil(t, envVar.ValueFrom.SecretKeyRef)
|
||||
assert.Equal(t, objectName, envVar.ValueFrom.SecretKeyRef.LocalObjectReference.Name)
|
||||
assert.Equal(t, objectKey, envVar.ValueFrom.SecretKeyRef.Key)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetObjectFieldSelector(t *testing.T) {
|
||||
t.Run("Returns correct environment variable", func(t *testing.T) {
|
||||
// Create a new instance of Tekton
|
||||
tekton := &Tekton{}
|
||||
|
||||
// Define the input parameters
|
||||
name := "testName"
|
||||
fieldPath := "testFieldPath"
|
||||
|
||||
// Call the getObjectFieldSelector method
|
||||
envVar := tekton.getObjectFieldSelector(name, fieldPath)
|
||||
|
||||
// Assert that the returned environment variable has the correct name
|
||||
assert.Equal(t, name, envVar.Name)
|
||||
|
||||
// Assert that the returned environment variable has the correct ObjectFieldSelector
|
||||
assert.NotNil(t, envVar.ValueFrom.FieldRef)
|
||||
assert.Equal(t, fieldPath, envVar.ValueFrom.FieldRef.FieldPath)
|
||||
})
|
||||
}
|
|
@ -52,8 +52,8 @@ Each new KFP-Tekton version is based on the long-term support of the Tekton Pipe
|
|||
| 1.7.x | 0.47.x | 1.11 | V1beta1 | 1.16.0 |
|
||||
| 1.8.x | 0.50.x | 1.12 | V1 | 2.11.3 |
|
||||
| 1.9.x | 0.53.x | 1.13 | V1 | 2.11.3 |
|
||||
| 2.0.x | 0.47.x | 1.11 | V1beta1 | 1.16.0 |
|
||||
| 2.1.x | 0.53.x | 1.13 | V1 | 1.16.0 |
|
||||
| 2.0.3 | 0.47.x | 1.11 | V1beta1 | 1.16.0 |
|
||||
| 2.0.5 | 0.53.x | 1.13 | V1 | 1.16.0 |
|
||||
|
||||
## Standalone Kubeflow Pipelines V1 with Tekton Backend Deployment
|
||||
|
||||
|
@ -73,17 +73,12 @@ To install the standalone Kubeflow Pipelines V1 with Tekton , run the following
|
|||
-p '{"data":{"default-timeout-minutes": "0"}}'
|
||||
```
|
||||
|
||||
3. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v1.9.1` [custom resource definitions](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)(CRDs).
|
||||
3. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v1.9.2` deployment
|
||||
```shell
|
||||
kubectl apply --selector kubeflow/crd-install=true -f https://raw.githubusercontent.com/kubeflow/kfp-tekton/master/install/v1.9.1/kfp-tekton.yaml
|
||||
kubectl apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/env/kfp-template\?ref\=v1.9.2
|
||||
```
|
||||
|
||||
4. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v1.9.1` deployment
|
||||
```shell
|
||||
kubectl apply -f https://raw.githubusercontent.com/kubeflow/kfp-tekton/master/install/v1.9.1/kfp-tekton.yaml
|
||||
```
|
||||
|
||||
5. Then, if you want to expose the Kubeflow Pipelines endpoint outside the cluster, run the following commands:
|
||||
4. Then, if you want to expose the Kubeflow Pipelines endpoint outside the cluster, run the following commands:
|
||||
```shell
|
||||
kubectl patch svc ml-pipeline-ui -n kubeflow -p '{"spec": {"type": "LoadBalancer"}}'
|
||||
```
|
||||
|
@ -93,15 +88,15 @@ To install the standalone Kubeflow Pipelines V1 with Tekton , run the following
|
|||
kubectl get svc ml-pipeline-ui -n kubeflow -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
|
||||
```
|
||||
|
||||
6. (GPU worker nodes only) If your Kubernetes cluster has a mixture of CPU and GPU worker nodes, it's recommended to disable the Tekton default affinity assistant so that Tekton won't schedule too many CPU workloads on the GPU nodes.
|
||||
5. (GPU worker nodes only) If your Kubernetes cluster has a mixture of CPU and GPU worker nodes, it's recommended to disable the Tekton default affinity assistant so that Tekton won't schedule too many CPU workloads on the GPU nodes.
|
||||
```shell
|
||||
kubectl patch cm feature-flags -n tekton-pipelines \
|
||||
-p '{"data":{"disable-affinity-assistant": "true"}}'
|
||||
```
|
||||
|
||||
7. (OpenShift only) If you are running the standalone KFP-Tekton on OpenShift, apply the necessary security context constraint below
|
||||
6. (OpenShift only) If you are running the standalone KFP-Tekton on OpenShift, apply the necessary security context constraint below
|
||||
```shell
|
||||
curl -L https://raw.githubusercontent.com/kubeflow/kfp-tekton/master/install/v1.9.1/kfp-tekton.yaml | yq 'del(.spec.template.spec.containers[].securityContext.runAsUser, .spec.template.spec.containers[].securityContext.runAsGroup)' | oc apply -f -
|
||||
curl -L https://raw.githubusercontent.com/kubeflow/kfp-tekton/master/install/v1.9.2/kfp-tekton.yaml | yq 'del(.spec.template.spec.containers[].securityContext.runAsUser, .spec.template.spec.containers[].securityContext.runAsGroup)' | oc apply -f -
|
||||
oc apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/third-party/openshift/standalone
|
||||
oc adm policy add-scc-to-user anyuid -z tekton-pipelines-controller
|
||||
oc adm policy add-scc-to-user anyuid -z tekton-pipelines-webhook
|
||||
|
@ -111,9 +106,9 @@ To install the standalone Kubeflow Pipelines V1 with Tekton , run the following
|
|||
|
||||
To install the standalone Kubeflow Pipelines V2 with Tekton, run the following steps:
|
||||
|
||||
1. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v2.0.3` along with Tekton `v0.47.5`
|
||||
1. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v2.0.5` along with Tekton `v0.53.2`
|
||||
```shell
|
||||
kubectl apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/env/platform-agnostic-tekton\?ref\=v2.0.3
|
||||
kubectl apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/env/platform-agnostic-tekton\?ref\=v2.0.5
|
||||
```
|
||||
|
||||
2. Then, if you want to expose the Kubeflow Pipelines endpoint outside the cluster, run the following commands:
|
||||
|
@ -135,7 +130,7 @@ To install the standalone Kubeflow Pipelines V2 with Tekton, run the following s
|
|||
Now, please use the [KFP V2 Python SDK](https://pypi.org/project/kfp/) to compile KFP-Tekton V2 pipelines because we are sharing the same pipeline spec starting from KFP V2.0.0.
|
||||
|
||||
```shell
|
||||
pip install "kfp>=2.0" "kfp-kubernetes>=1.0.0"
|
||||
pip install "kfp>=2.6.0" "kfp-kubernetes>=1.1.0"
|
||||
```
|
||||
|
||||
## Standalone Kubeflow Pipelines with Openshift Pipelines Backend Deployment
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -11,4 +11,4 @@ commonLabels:
|
|||
images:
|
||||
- name: gcr.io/ml-pipeline/cache-server
|
||||
newName: quay.io/aipipeline/cache-server
|
||||
newTag: 1.9.1
|
||||
newTag: 1.9.2
|
||||
|
|
|
@ -42,20 +42,20 @@ patches:
|
|||
images:
|
||||
- name: gcr.io/ml-pipeline/api-server
|
||||
newName: quay.io/aipipeline/api-server
|
||||
newTag: 1.9.1
|
||||
newTag: 1.9.2
|
||||
- name: gcr.io/ml-pipeline/persistenceagent
|
||||
newName: quay.io/aipipeline/persistenceagent
|
||||
newTag: 1.9.1
|
||||
newTag: 1.9.2
|
||||
- name: gcr.io/ml-pipeline/scheduledworkflow
|
||||
newName: quay.io/aipipeline/scheduledworkflow
|
||||
newTag: 1.9.1
|
||||
newTag: 1.9.2
|
||||
- name: gcr.io/ml-pipeline/frontend
|
||||
newName: quay.io/aipipeline/frontend
|
||||
newTag: 1.9.1
|
||||
newTag: 1.9.2
|
||||
- name: gcr.io/ml-pipeline/viewer-crd-controller
|
||||
newTag: 1.8.4
|
||||
- name: gcr.io/ml-pipeline/visualization-server
|
||||
newTag: 1.8.4
|
||||
- name: gcr.io/ml-pipeline/metadata-writer
|
||||
newName: quay.io/aipipeline/metadata-writer
|
||||
newTag: 1.9.1
|
||||
newTag: 1.9.2
|
||||
|
|
|
@ -8,6 +8,6 @@ namespace: tekton-pipelines
|
|||
|
||||
images:
|
||||
- name: quay.io/aipipeline/pipelineloop-controller
|
||||
newTag: 1.9.1
|
||||
newTag: 1.9.2
|
||||
- name: quay.io/aipipeline/pipelineloop-webhook
|
||||
newTag: 1.9.1
|
||||
newTag: 1.9.2
|
||||
|
|
|
@ -61,7 +61,7 @@ adding the `TektonCompiler` and the `TektonClient`:
|
|||
|
||||
## Project Prerequisites
|
||||
|
||||
- Python: `3.8` or later
|
||||
- Python: `3.8` or later. For Python 3.12, make sure to not have the `SETUPTOOLS_USE_DISTUTILS` flag because it's already [deprecated](https://github.com/pypa/setuptools/issues/4002).
|
||||
- Tekton: [`v0.53.2`](https://github.com/tektoncd/pipeline/releases/tag/v0.53.2) or [later](https://github.com/tektoncd/pipeline/releases/latest)
|
||||
- Tekton CLI: [`0.30.1`](https://github.com/tektoncd/cli/releases/tag/v0.30.1)
|
||||
- Kubeflow Pipelines: [KFP with Tekton backend](/guides/kfp_tekton_install.md)
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
__version__ = '1.9.2'
|
||||
__version__ = '1.9.3'
|
||||
|
||||
from ._client import TektonClient # noqa F401
|
||||
from .k8s_client_helper import env_from_secret # noqa F401
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
import os
|
||||
import tempfile
|
||||
import time
|
||||
import warnings
|
||||
|
||||
import datetime
|
||||
from typing import Mapping, Callable, Optional
|
||||
|
@ -27,6 +28,7 @@ from kfp_tekton_server_api import ApiException
|
|||
|
||||
from .compiler import TektonCompiler
|
||||
from .compiler.pipeline_utils import TektonPipelineConf
|
||||
from kfp._auth import get_auth_token, get_gcp_access_token
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
@ -106,7 +108,8 @@ class TektonClient(kfp.Client):
|
|||
ssl_ca_cert=None,
|
||||
kube_context=None,
|
||||
credentials=None,
|
||||
ui_host=None):
|
||||
ui_host=None,
|
||||
verify_ssl=None):
|
||||
"""Create a new instance of kfp client."""
|
||||
host = host or os.environ.get(KF_PIPELINES_ENDPOINT_ENV)
|
||||
self._uihost = os.environ.get(KF_PIPELINES_UI_ENDPOINT_ENV, ui_host or
|
||||
|
@ -120,7 +123,7 @@ class TektonClient(kfp.Client):
|
|||
|
||||
config = self._load_config(host, client_id, namespace, other_client_id,
|
||||
other_client_secret, existing_token, proxy,
|
||||
ssl_ca_cert, kube_context, credentials)
|
||||
ssl_ca_cert, kube_context, credentials, verify_ssl)
|
||||
# Save the loaded API client configuration, as a reference if update is
|
||||
# needed.
|
||||
self._load_context_setting_or_default()
|
||||
|
@ -162,7 +165,106 @@ class TektonClient(kfp.Client):
|
|||
except FileNotFoundError:
|
||||
logging.info(
|
||||
'Failed to automatically set namespace.', exc_info=False)
|
||||
|
||||
|
||||
def _load_config(self, host, client_id, namespace, other_client_id,
|
||||
other_client_secret, existing_token, proxy, ssl_ca_cert,
|
||||
kube_context, credentials, verify_ssl):
|
||||
config = kfp_server_api.configuration.Configuration()
|
||||
|
||||
if proxy:
|
||||
# https://github.com/kubeflow/pipelines/blob/c6ac5e0b1fd991e19e96419f0f508ec0a4217c29/backend/api/python_http_client/kfp_server_api/rest.py#L100
|
||||
config.proxy = proxy
|
||||
if verify_ssl is not None:
|
||||
config.verify_ssl = verify_ssl
|
||||
|
||||
if ssl_ca_cert:
|
||||
config.ssl_ca_cert = ssl_ca_cert
|
||||
|
||||
host = host or ''
|
||||
|
||||
# Defaults to 'https' if host does not contain 'http' or 'https' protocol.
|
||||
if host and not host.startswith('http'):
|
||||
warnings.warn(
|
||||
'The host %s does not contain the "http" or "https" protocol.'
|
||||
' Defaults to "https".' % host)
|
||||
host = 'https://' + host
|
||||
|
||||
# Preprocess the host endpoint to prevent some common user mistakes.
|
||||
if not client_id:
|
||||
# always preserving the protocol (http://localhost requires it)
|
||||
host = host.rstrip('/')
|
||||
|
||||
if host:
|
||||
config.host = host
|
||||
|
||||
token = None
|
||||
|
||||
# "existing_token" is designed to accept token generated outside of SDK. Here is an example.
|
||||
#
|
||||
# https://cloud.google.com/functions/docs/securing/function-identity
|
||||
# https://cloud.google.com/endpoints/docs/grpc/service-account-authentication
|
||||
#
|
||||
# import requests
|
||||
# import kfp
|
||||
#
|
||||
# def get_access_token():
|
||||
# url = 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token'
|
||||
# r = requests.get(url, headers={'Metadata-Flavor': 'Google'})
|
||||
# r.raise_for_status()
|
||||
# access_token = r.json()['access_token']
|
||||
# return access_token
|
||||
#
|
||||
# client = kfp.Client(host='<KFPHost>', existing_token=get_access_token())
|
||||
#
|
||||
if existing_token:
|
||||
token = existing_token
|
||||
self._is_refresh_token = False
|
||||
elif client_id:
|
||||
token = get_auth_token(client_id, other_client_id,
|
||||
other_client_secret)
|
||||
self._is_refresh_token = True
|
||||
elif self._is_inverse_proxy_host(host):
|
||||
token = get_gcp_access_token()
|
||||
self._is_refresh_token = False
|
||||
elif credentials:
|
||||
config.api_key['authorization'] = 'placeholder'
|
||||
config.api_key_prefix['authorization'] = 'Bearer'
|
||||
config.refresh_api_key_hook = credentials.refresh_api_key_hook
|
||||
|
||||
if token:
|
||||
config.api_key['authorization'] = token
|
||||
config.api_key_prefix['authorization'] = 'Bearer'
|
||||
return config
|
||||
|
||||
if host:
|
||||
# if host is explicitly set with auth token, it's probably a port forward address.
|
||||
return config
|
||||
|
||||
import kubernetes as k8s
|
||||
in_cluster = True
|
||||
try:
|
||||
k8s.config.load_incluster_config()
|
||||
except:
|
||||
in_cluster = False
|
||||
pass
|
||||
|
||||
if in_cluster:
|
||||
config.host = TektonClient.IN_CLUSTER_DNS_NAME.format(namespace)
|
||||
config = self._get_config_with_default_credentials(config)
|
||||
return config
|
||||
|
||||
try:
|
||||
k8s.config.load_kube_config(
|
||||
client_configuration=config, context=kube_context)
|
||||
except:
|
||||
print('Failed to load kube config.')
|
||||
return config
|
||||
|
||||
if config.host:
|
||||
config.host = config.host + '/' + TektonClient.KUBE_PROXY_PATH.format(
|
||||
namespace)
|
||||
return config
|
||||
|
||||
def wait_for_run_completion(self, run_id: str, timeout: int):
|
||||
"""Waits for a run to complete.
|
||||
|
||||
|
|
|
@ -20,14 +20,14 @@
|
|||
#
|
||||
# To create a distribution for PyPi run:
|
||||
#
|
||||
# $ export KFP_TEKTON_VERSION=1.9.2-rc1
|
||||
# $ export KFP_TEKTON_VERSION=1.9.3-rc1
|
||||
# $ python3 setup.py sdist
|
||||
# $ twine check dist/kfp-tekton-${KFP_TEKTON_VERSION/-rc/rc}.tar.gz
|
||||
# $ twine upload --repository pypi dist/kfp-tekton-${KFP_TEKTON_VERSION/-rc/rc}.tar.gz
|
||||
#
|
||||
# ... or:
|
||||
#
|
||||
# $ make distribution KFP_TEKTON_VERSION=1.9.2-rc1
|
||||
# $ make distribution KFP_TEKTON_VERSION=1.9.3-rc1
|
||||
#
|
||||
# =============================================================================
|
||||
|
||||
|
@ -61,7 +61,8 @@ logger.setLevel(logging.INFO)
|
|||
REQUIRES = [
|
||||
"kfp>=1.8.10,<1.8.23",
|
||||
"kfp-tekton-server-api==1.8.0rc8",
|
||||
"PyYAML>=6,<7"
|
||||
"PyYAML>=6,<7",
|
||||
"setuptools>=69.1"
|
||||
]
|
||||
|
||||
TESTS_REQUIRE = [
|
||||
|
|
|
@ -1 +1 @@
|
|||
1.8.1
|
||||
1.9.2
|
||||
|
|
|
@ -8,12 +8,12 @@ require (
|
|||
github.com/hashicorp/go-multierror v1.1.1
|
||||
github.com/kubeflow/kfp-tekton/tekton-catalog/cache v0.0.0
|
||||
github.com/kubeflow/kfp-tekton/tekton-catalog/objectstore v0.0.0
|
||||
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-00010101000000-000000000000
|
||||
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20231027040853-58ce09e07d03
|
||||
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-20231127195001-a75d4b3711ff
|
||||
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20240416215826-da804407ad31
|
||||
github.com/tektoncd/pipeline v0.53.2
|
||||
go.uber.org/zap v1.26.0
|
||||
gomodules.xyz/jsonpatch/v2 v2.4.0
|
||||
k8s.io/api v0.27.1
|
||||
k8s.io/api v0.27.2
|
||||
k8s.io/apimachinery v0.27.3
|
||||
k8s.io/client-go v0.27.2
|
||||
k8s.io/utils v0.0.0-20230505201702-9f6742963106
|
||||
|
@ -30,7 +30,7 @@ require (
|
|||
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
|
||||
github.com/IBM/ibm-cos-sdk-go v1.8.0 // indirect
|
||||
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
|
||||
github.com/aws/aws-sdk-go v1.42.50 // indirect
|
||||
github.com/aws/aws-sdk-go v1.45.25 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/blang/semver/v4 v4.0.0 // indirect
|
||||
github.com/blendle/zapdriver v1.3.1 // indirect
|
||||
|
@ -41,16 +41,16 @@ require (
|
|||
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
|
||||
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
|
||||
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
|
||||
github.com/go-kit/log v0.2.0 // indirect
|
||||
github.com/go-kit/log v0.2.1 // indirect
|
||||
github.com/go-logfmt/logfmt v0.5.1 // indirect
|
||||
github.com/go-logr/logr v1.2.4 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.6 // indirect
|
||||
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
||||
github.com/go-openapi/swag v0.22.3 // indirect
|
||||
github.com/go-sql-driver/mysql v1.6.0 // indirect
|
||||
github.com/go-sql-driver/mysql v1.7.1 // indirect
|
||||
github.com/gobuffalo/flect v0.2.4 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/glog v1.1.0 // indirect
|
||||
github.com/golang/glog v1.2.0 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/google/cel-go v0.12.6 // indirect
|
||||
|
@ -74,21 +74,21 @@ require (
|
|||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/kelseyhightower/envconfig v1.4.0 // indirect
|
||||
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03 // indirect
|
||||
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03 // indirect
|
||||
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03 // indirect
|
||||
github.com/kubeflow/pipelines v0.0.0-20240416215826-da804407ad31 // indirect
|
||||
github.com/kubeflow/pipelines/api v0.0.0-20240416215826-da804407ad31 // indirect
|
||||
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240416215826-da804407ad31 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.16 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.19 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.14.0 // indirect
|
||||
github.com/prometheus/client_golang v1.16.0 // indirect
|
||||
github.com/prometheus/client_model v0.4.0 // indirect
|
||||
github.com/prometheus/common v0.37.0 // indirect
|
||||
github.com/prometheus/procfs v0.8.0 // indirect
|
||||
github.com/prometheus/common v0.42.0 // indirect
|
||||
github.com/prometheus/procfs v0.10.1 // indirect
|
||||
github.com/prometheus/statsd_exporter v0.21.0 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/stoewer/go-strcase v1.2.0 // indirect
|
||||
|
@ -100,7 +100,7 @@ require (
|
|||
golang.org/x/crypto v0.17.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
|
||||
golang.org/x/mod v0.12.0 // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/net v0.19.0 // indirect
|
||||
golang.org/x/oauth2 v0.13.0 // indirect
|
||||
golang.org/x/sync v0.4.0 // indirect
|
||||
golang.org/x/sys v0.15.0 // indirect
|
||||
|
@ -122,8 +122,8 @@ require (
|
|||
gorm.io/driver/mysql v1.4.3 // indirect
|
||||
gorm.io/driver/sqlite v1.4.2 // indirect
|
||||
gorm.io/gorm v1.24.0 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.26.5 // indirect
|
||||
k8s.io/code-generator v0.26.5 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.27.2 // indirect
|
||||
k8s.io/code-generator v0.27.2 // indirect
|
||||
k8s.io/gengo v0.0.0-20221011193443-fad74ee6edd9 // indirect
|
||||
k8s.io/klog/v2 v2.100.1 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect
|
||||
|
|
|
@ -211,6 +211,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, customRun *tektonv1beta1
|
|||
}
|
||||
logger.Infof("Received control for a CustomRun %s/%s %-v", customRun.Namespace, customRun.Name, customRun.Spec.CustomSpec)
|
||||
// If the CustomRun has not started, initialize the Condition and set the start time.
|
||||
firstIteration := false
|
||||
if !customRun.HasStarted() {
|
||||
logger.Infof("Starting new CustomRun %s/%s", customRun.Namespace, customRun.Name)
|
||||
customRun.Status.InitializeConditions()
|
||||
|
@ -226,6 +227,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, customRun *tektonv1beta1
|
|||
// on the event to perform user facing initialisations, such has reset a CI check status
|
||||
afterCondition := customRun.Status.GetCondition(apis.ConditionSucceeded)
|
||||
events.Emit(ctx, nil, afterCondition, customRun)
|
||||
firstIteration = true
|
||||
}
|
||||
|
||||
// Store the condition before reconcile
|
||||
|
@ -264,7 +266,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, customRun *tektonv1beta1
|
|||
return nil
|
||||
}
|
||||
// Reconcile the Run
|
||||
if err := c.reconcile(ctx, customRun, status); err != nil {
|
||||
if err := c.reconcile(ctx, customRun, status, firstIteration); err != nil {
|
||||
logger.Errorf("Reconcile error: %v", err.Error())
|
||||
merr = multierror.Append(merr, err)
|
||||
}
|
||||
|
@ -341,7 +343,7 @@ func (c *Reconciler) setMaxNestedStackDepth(ctx context.Context, pipelineLoopSpe
|
|||
}
|
||||
} else if t.TaskRef != nil {
|
||||
if t.TaskRef.Kind == "PipelineLoop" {
|
||||
tl, err := c.pipelineloopClientSet.CustomV1alpha1().PipelineLoops(customRun.Namespace).Get(ctx, t.TaskRef.Name, metav1.GetOptions{})
|
||||
tl, err := c.pipelineLoopLister.PipelineLoops(customRun.Namespace).Get(t.TaskRef.Name)
|
||||
if err == nil && tl != nil {
|
||||
if len(tl.ObjectMeta.Annotations) == 0 {
|
||||
tl.ObjectMeta.Annotations = map[string]string{MaxNestedStackDepthKey: fmt.Sprint(depth)}
|
||||
|
@ -360,12 +362,12 @@ func (c *Reconciler) setMaxNestedStackDepth(ctx context.Context, pipelineLoopSpe
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Reconciler) reconcile(ctx context.Context, customRun *tektonv1beta1.CustomRun, status *pipelineloopv1alpha1.PipelineLoopRunStatus) error {
|
||||
func (c *Reconciler) reconcile(ctx context.Context, customRun *tektonv1beta1.CustomRun, status *pipelineloopv1alpha1.PipelineLoopRunStatus, firstIteration bool) error {
|
||||
ctx = EnableCustomTaskFeatureFlag(ctx)
|
||||
logger := logging.FromContext(ctx)
|
||||
var hashSum string
|
||||
// Get the PipelineLoop referenced by the CustomRun
|
||||
pipelineLoopMeta, pipelineLoopSpec, err := c.getPipelineLoop(ctx, customRun)
|
||||
pipelineLoopMeta, pipelineLoopSpec, err := c.getPipelineLoop(ctx, customRun, firstIteration)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -431,18 +433,36 @@ func (c *Reconciler) reconcile(ctx context.Context, customRun *tektonv1beta1.Cus
|
|||
}
|
||||
// CustomRun is cancelled, just cancel all the running instance and return
|
||||
if customRun.IsCancelled() {
|
||||
var DAGStatus pb.Execution_State
|
||||
if len(failedPrs) > 0 {
|
||||
customRun.Status.MarkCustomRunFailed(pipelineloopv1alpha1.PipelineLoopRunReasonFailed.String(),
|
||||
"CustomRun %s/%s was failed",
|
||||
customRun.Namespace, customRun.Name)
|
||||
DAGStatus = pb.Execution_FAILED
|
||||
} else {
|
||||
reason := pipelineloopv1alpha1.PipelineLoopRunReasonCancelled.String()
|
||||
if customRun.HasTimedOut(c.clock) { // This check is only possible if we are on tekton 0.27.0 +
|
||||
reason = string(tektonv1beta1.CustomRunReasonTimedOut)
|
||||
}
|
||||
customRun.Status.MarkCustomRunFailed(reason, "CustomRun %s/%s was cancelled", customRun.Namespace, customRun.Name)
|
||||
DAGStatus = pb.Execution_CANCELED
|
||||
}
|
||||
if c.runKFPV2Driver == "true" {
|
||||
options, err := kfptask.ParseParams(customRun)
|
||||
if err != nil {
|
||||
logger.Errorf("Run %s/%s is invalid because of %s", customRun.Namespace, customRun.Name, err)
|
||||
customRun.Status.MarkCustomRunFailed(kfptask.ReasonFailedValidation,
|
||||
"Run can't be run because it has an invalid param - %v", err)
|
||||
return err
|
||||
}
|
||||
DAGErr := kfptask.UpdateDAGPublisher(ctx, options, DAGStatus)
|
||||
if err != nil {
|
||||
logger.Errorf("kfp publisher failed when reconciling Run %s/%s: %v", customRun.Namespace, customRun.Name, DAGErr)
|
||||
customRun.Status.MarkCustomRunFailed(kfptask.ReasonDriverError,
|
||||
"kfp publisher execution failed: %v", DAGErr)
|
||||
return DAGErr
|
||||
}
|
||||
}
|
||||
|
||||
for _, currentRunningPr := range currentRunningPrs {
|
||||
logger.Infof("CustomRun %s/%s is cancelled. Cancelling PipelineRun %s.", customRun.Namespace, customRun.Name, currentRunningPr.Name)
|
||||
if _, err := c.pipelineClientSet.TektonV1().PipelineRuns(customRun.Namespace).Patch(ctx, currentRunningPr.Name, types.JSONPatchType, cancelPatchBytes, metav1.PatchOptions{}); err != nil {
|
||||
|
@ -599,16 +619,20 @@ func (c *Reconciler) reconcile(ctx context.Context, customRun *tektonv1beta1.Cus
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Reconciler) getPipelineLoop(ctx context.Context, customRun *tektonv1beta1.CustomRun) (*metav1.ObjectMeta, *pipelineloopv1alpha1.PipelineLoopSpec, error) {
|
||||
func (c *Reconciler) getPipelineLoop(ctx context.Context, customRun *tektonv1beta1.CustomRun, firstIteration bool) (*metav1.ObjectMeta, *pipelineloopv1alpha1.PipelineLoopSpec, error) {
|
||||
pipelineLoopMeta := metav1.ObjectMeta{}
|
||||
pipelineLoopSpec := pipelineloopv1alpha1.PipelineLoopSpec{}
|
||||
if customRun.Spec.CustomRef != nil && customRun.Spec.CustomRef.Name != "" {
|
||||
// Use the k8 client to get the PipelineLoop rather than the lister. This avoids a timing issue where
|
||||
// Use the k8 client to get the PipelineLoop rather than the Lister on the first reconcile. This avoids a timing issue where
|
||||
// the PipelineLoop is not yet in the lister cache if it is created at nearly the same time as the Run.
|
||||
// See https://github.com/tektoncd/pipeline/issues/2740 for discussion on this issue.
|
||||
//
|
||||
// tl, err := c.pipelineLoopLister.PipelineLoops(customRun.Namespace).Get(customRun.Spec.Ref.Name)
|
||||
tl, err := c.pipelineloopClientSet.CustomV1alpha1().PipelineLoops(customRun.Namespace).Get(ctx, customRun.Spec.CustomRef.Name, metav1.GetOptions{})
|
||||
var tl *pipelineloopv1alpha1.PipelineLoop
|
||||
var err error
|
||||
if firstIteration {
|
||||
tl, err = c.pipelineloopClientSet.CustomV1alpha1().PipelineLoops(customRun.Namespace).Get(ctx, customRun.Spec.CustomRef.Name, metav1.GetOptions{})
|
||||
} else {
|
||||
tl, err = c.pipelineLoopLister.PipelineLoops(customRun.Namespace).Get(customRun.Spec.CustomRef.Name)
|
||||
}
|
||||
if err != nil {
|
||||
customRun.Status.MarkCustomRunFailed(pipelineloopv1alpha1.PipelineLoopRunReasonCouldntGetPipelineLoop.String(),
|
||||
"Error retrieving PipelineLoop for CustomRun %s/%s: %s",
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
# Copyright 2021 The Kubeflow Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
FROM golang:1.20.4-alpine3.17 as builder
|
||||
|
||||
WORKDIR /go/src/github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-driver
|
||||
COPY . .
|
||||
|
||||
# Needed musl-dev for github.com/mattn/go-sqlite3
|
||||
RUN apk update && apk upgrade && \
|
||||
apk add --no-cache bash git openssh gcc musl-dev patch
|
||||
|
||||
RUN go mod vendor && patch -u vendor/k8s.io/klog/v2/klog.go pkg/controller/klog.patch
|
||||
RUN CGO_ENABLED=0 GO111MODULE=on go build -mod=vendor -o /bin/controller cmd/controller/*.go && rm -rf vendor
|
||||
|
||||
FROM alpine:3.17
|
||||
WORKDIR /bin
|
||||
|
||||
COPY --from=builder /bin/controller /bin/controller
|
||||
RUN chmod +x /bin/controller
|
||||
RUN apk --no-cache add tzdata
|
||||
|
||||
CMD /bin/controller
|
|
@ -1 +0,0 @@
|
|||
2.0.3
|
|
@ -1,26 +0,0 @@
|
|||
/*
|
||||
// Copyright 2023 kubeflow.org
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-driver/pkg/controller"
|
||||
"knative.dev/pkg/injection/sharedmain"
|
||||
)
|
||||
|
||||
func main() {
|
||||
sharedmain.Main(controller.ControllerName, controller.NewController)
|
||||
}
|
|
@ -1,260 +0,0 @@
|
|||
// Copyright 2021-2023 The Kubeflow Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
|
||||
"github.com/kubeflow/pipelines/backend/src/v2/driver"
|
||||
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
|
||||
"github.com/kubeflow/pipelines/backend/src/v2/config"
|
||||
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
|
||||
)
|
||||
|
||||
const (
|
||||
driverTypeArg = "type"
|
||||
)
|
||||
|
||||
var (
|
||||
// inputs
|
||||
driverType = flag.String(driverTypeArg, "", "task driver type, one of ROOT_DAG, DAG, CONTAINER")
|
||||
pipelineName = flag.String("pipeline_name", "", "pipeline context name")
|
||||
runID = flag.String("run_id", "", "pipeline run uid")
|
||||
componentSpecJson = flag.String("component", "{}", "component spec")
|
||||
taskSpecJson = flag.String("task", "", "task spec")
|
||||
runtimeConfigJson = flag.String("runtime_config", "", "jobruntime config")
|
||||
iterationIndex = flag.Int("iteration_index", -1, "iteration index, -1 means not an interation")
|
||||
|
||||
// container inputs
|
||||
dagExecutionID = flag.Int64("dag_execution_id", 0, "DAG execution ID")
|
||||
containerSpecJson = flag.String("container", "{}", "container spec")
|
||||
k8sExecConfigJson = flag.String("kubernetes_config", "{}", "kubernetes executor config")
|
||||
|
||||
// config
|
||||
mlmdServerAddress = flag.String("mlmd_server_address", "", "MLMD server address")
|
||||
mlmdServerPort = flag.String("mlmd_server_port", "", "MLMD server port")
|
||||
|
||||
// output paths
|
||||
executionIDPath = flag.String("execution_id_path", "", "Exeucution ID output path")
|
||||
iterationCountPath = flag.String("iteration_count_path", "", "Iteration Count output path")
|
||||
podSpecPatchPath = flag.String("pod_spec_patch_path", "", "Pod Spec Patch output path")
|
||||
// the value stored in the paths will be either 'true' or 'false'
|
||||
cachedDecisionPath = flag.String("cached_decision_path", "", "Cached Decision output path")
|
||||
conditionPath = flag.String("condition_path", "", "Condition output path")
|
||||
)
|
||||
|
||||
// func RootDAG(pipelineName string, runID string, component *pipelinespec.ComponentSpec, task *pipelinespec.PipelineTaskSpec, mlmd *metadata.Client) (*Execution, error) {
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
err := drive()
|
||||
if err != nil {
|
||||
glog.Exitf("%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Use WARNING default logging level to facilitate troubleshooting.
|
||||
func init() {
|
||||
flag.Set("logtostderr", "true")
|
||||
// Change the WARNING to INFO level for debugging.
|
||||
flag.Set("stderrthreshold", "WARNING")
|
||||
}
|
||||
|
||||
func validate() error {
|
||||
if *driverType == "" {
|
||||
return fmt.Errorf("argument --%s must be specified", driverTypeArg)
|
||||
}
|
||||
// validation responsibility lives in driver itself, so we do not validate all other args
|
||||
return nil
|
||||
}
|
||||
|
||||
func drive() (err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
err = fmt.Errorf("KFP driver: %w", err)
|
||||
}
|
||||
}()
|
||||
ctx := context.Background()
|
||||
if err = validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
glog.Infof("input ComponentSpec:%s\n", prettyPrint(*componentSpecJson))
|
||||
componentSpec := &pipelinespec.ComponentSpec{}
|
||||
if err := jsonpb.UnmarshalString(*componentSpecJson, componentSpec); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal component spec, error: %w\ncomponentSpec: %v", err, prettyPrint(*componentSpecJson))
|
||||
}
|
||||
var taskSpec *pipelinespec.PipelineTaskSpec
|
||||
if *taskSpecJson != "" {
|
||||
glog.Infof("input TaskSpec:%s\n", prettyPrint(*taskSpecJson))
|
||||
taskSpec = &pipelinespec.PipelineTaskSpec{}
|
||||
if err := jsonpb.UnmarshalString(*taskSpecJson, taskSpec); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal task spec, error: %w\ntask: %v", err, taskSpecJson)
|
||||
}
|
||||
}
|
||||
glog.Infof("input ContainerSpec:%s\n", prettyPrint(*containerSpecJson))
|
||||
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{}
|
||||
if err := jsonpb.UnmarshalString(*containerSpecJson, containerSpec); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal container spec, error: %w\ncontainerSpec: %v", err, containerSpecJson)
|
||||
}
|
||||
var runtimeConfig *pipelinespec.PipelineJob_RuntimeConfig
|
||||
if *runtimeConfigJson != "" {
|
||||
glog.Infof("input RuntimeConfig:%s\n", prettyPrint(*runtimeConfigJson))
|
||||
runtimeConfig = &pipelinespec.PipelineJob_RuntimeConfig{}
|
||||
if err := jsonpb.UnmarshalString(*runtimeConfigJson, runtimeConfig); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal runtime config, error: %w\nruntimeConfig: %v", err, runtimeConfigJson)
|
||||
}
|
||||
}
|
||||
var k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
|
||||
if *k8sExecConfigJson != "" {
|
||||
glog.Infof("input kubernetesConfig:%s\n", prettyPrint(*k8sExecConfigJson))
|
||||
k8sExecCfg = &kubernetesplatform.KubernetesExecutorConfig{}
|
||||
if err := jsonpb.UnmarshalString(*k8sExecConfigJson, k8sExecCfg); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal Kubernetes config, error: %w\nKubernetesConfig: %v", err, k8sExecConfigJson)
|
||||
}
|
||||
}
|
||||
namespace, err := config.InPodNamespace()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := newMlmdClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cacheClient, err := cacheutils.NewClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
options := driver.Options{
|
||||
PipelineName: *pipelineName,
|
||||
RunID: *runID,
|
||||
Namespace: namespace,
|
||||
Component: componentSpec,
|
||||
Task: taskSpec,
|
||||
DAGExecutionID: *dagExecutionID,
|
||||
IterationIndex: *iterationIndex,
|
||||
}
|
||||
var execution *driver.Execution
|
||||
var driverErr error
|
||||
switch *driverType {
|
||||
case "ROOT_DAG":
|
||||
options.RuntimeConfig = runtimeConfig
|
||||
execution, driverErr = driver.RootDAG(ctx, options, client)
|
||||
case "DAG":
|
||||
execution, driverErr = driver.DAG(ctx, options, client)
|
||||
case "CONTAINER":
|
||||
options.Container = containerSpec
|
||||
options.KubernetesExecutorConfig = k8sExecCfg
|
||||
execution, driverErr = driver.Container(ctx, options, client, cacheClient)
|
||||
default:
|
||||
err = fmt.Errorf("unknown driverType %s", *driverType)
|
||||
}
|
||||
if driverErr != nil {
|
||||
if execution == nil {
|
||||
return driverErr
|
||||
}
|
||||
defer func() {
|
||||
// Override error with driver error, because driver error is more important.
|
||||
// However, we continue running, because the following code prints debug info that
|
||||
// may be helpful for figuring out why this failed.
|
||||
err = driverErr
|
||||
}()
|
||||
}
|
||||
if execution.ID != 0 {
|
||||
glog.Infof("output execution.ID=%v", execution.ID)
|
||||
if *executionIDPath != "" {
|
||||
if err = writeFile(*executionIDPath, []byte(fmt.Sprint(execution.ID))); err != nil {
|
||||
return fmt.Errorf("failed to write execution ID to file: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if execution.IterationCount != nil {
|
||||
if err = writeFile(*iterationCountPath, []byte(fmt.Sprintf("%v", *execution.IterationCount))); err != nil {
|
||||
return fmt.Errorf("failed to write iteration count to file: %w", err)
|
||||
}
|
||||
}
|
||||
if execution.Cached != nil {
|
||||
if err = writeFile(*cachedDecisionPath, []byte(strconv.FormatBool(*execution.Cached))); err != nil {
|
||||
return fmt.Errorf("failed to write cached decision to file: %w", err)
|
||||
}
|
||||
}
|
||||
if execution.Condition != nil {
|
||||
if err = writeFile(*conditionPath, []byte(strconv.FormatBool(*execution.Condition))); err != nil {
|
||||
return fmt.Errorf("failed to write condition to file: %w", err)
|
||||
}
|
||||
}
|
||||
if execution.PodSpecPatch != "" {
|
||||
glog.Infof("output podSpecPatch=\n%s\n", execution.PodSpecPatch)
|
||||
if *podSpecPatchPath == "" {
|
||||
return fmt.Errorf("--pod_spec_patch_path is required for container executor drivers")
|
||||
}
|
||||
if err = writeFile(*podSpecPatchPath, []byte(execution.PodSpecPatch)); err != nil {
|
||||
return fmt.Errorf("failed to write pod spec patch to file: %w", err)
|
||||
}
|
||||
}
|
||||
if execution.ExecutorInput != nil {
|
||||
marshaler := jsonpb.Marshaler{}
|
||||
executorInputJSON, err := marshaler.MarshalToString(execution.ExecutorInput)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal ExecutorInput to JSON: %w", err)
|
||||
}
|
||||
glog.Infof("output ExecutorInput:%s\n", prettyPrint(executorInputJSON))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func prettyPrint(jsonStr string) string {
|
||||
var prettyJSON bytes.Buffer
|
||||
err := json.Indent(&prettyJSON, []byte(jsonStr), "", " ")
|
||||
if err != nil {
|
||||
return jsonStr
|
||||
}
|
||||
return prettyJSON.String()
|
||||
}
|
||||
|
||||
func writeFile(path string, data []byte) (err error) {
|
||||
if path == "" {
|
||||
return fmt.Errorf("path is not specified")
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to write to %s: %w", path, err)
|
||||
}
|
||||
}()
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
||||
return err
|
||||
}
|
||||
return ioutil.WriteFile(path, data, 0o644)
|
||||
}
|
||||
|
||||
func newMlmdClient() (*metadata.Client, error) {
|
||||
mlmdConfig := metadata.DefaultConfig()
|
||||
if *mlmdServerAddress != "" && *mlmdServerPort != "" {
|
||||
mlmdConfig.Address = *mlmdServerAddress
|
||||
mlmdConfig.Port = *mlmdServerPort
|
||||
}
|
||||
return metadata.NewClient(mlmdConfig.Address, mlmdConfig.Port)
|
||||
}
|
|
@ -1,123 +0,0 @@
|
|||
module github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-driver
|
||||
|
||||
require (
|
||||
github.com/golang/glog v1.1.0
|
||||
github.com/golang/protobuf v1.5.3
|
||||
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03
|
||||
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03
|
||||
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03
|
||||
github.com/tektoncd/pipeline v0.50.2
|
||||
k8s.io/api v0.27.1
|
||||
k8s.io/client-go v0.27.2
|
||||
knative.dev/pkg v0.0.0-20231011201526-df28feae6d34
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.110.2 // indirect
|
||||
cloud.google.com/go/compute v1.19.3 // indirect
|
||||
cloud.google.com/go/compute/metadata v0.2.3 // indirect
|
||||
cloud.google.com/go/iam v1.1.0 // indirect
|
||||
cloud.google.com/go/storage v1.29.0 // indirect
|
||||
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
|
||||
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
|
||||
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20220418222510-f25a4f6275ed // indirect
|
||||
github.com/aws/aws-sdk-go v1.42.50 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/blang/semver/v4 v4.0.0 // indirect
|
||||
github.com/blendle/zapdriver v1.3.1 // indirect
|
||||
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
|
||||
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
|
||||
github.com/go-kit/log v0.2.0 // indirect
|
||||
github.com/go-logfmt/logfmt v0.5.1 // indirect
|
||||
github.com/go-logr/logr v1.2.4 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.6 // indirect
|
||||
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
||||
github.com/go-openapi/swag v0.22.3 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/google/cel-go v0.12.5 // indirect
|
||||
github.com/google/gnostic v0.6.9 // indirect
|
||||
github.com/google/go-cmp v0.5.9 // indirect
|
||||
github.com/google/go-containerregistry v0.15.2 // indirect
|
||||
github.com/google/gofuzz v1.2.0 // indirect
|
||||
github.com/google/s2a-go v0.1.4 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/google/wire v0.4.0 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
|
||||
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/imdario/mergo v0.3.13 // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/kelseyhightower/envconfig v1.4.0 // indirect
|
||||
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20230810215105-e1f0c010f800 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.13.0 // indirect
|
||||
github.com/prometheus/client_model v0.4.0 // indirect
|
||||
github.com/prometheus/common v0.37.0 // indirect
|
||||
github.com/prometheus/procfs v0.8.0 // indirect
|
||||
github.com/prometheus/statsd_exporter v0.21.0 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/stoewer/go-strcase v1.2.0 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
go.uber.org/atomic v1.10.0 // indirect
|
||||
go.uber.org/automaxprocs v1.4.0 // indirect
|
||||
go.uber.org/multierr v1.8.0 // indirect
|
||||
go.uber.org/zap v1.24.0 // indirect
|
||||
gocloud.dev v0.22.0 // indirect
|
||||
golang.org/x/crypto v0.17.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/oauth2 v0.9.0 // indirect
|
||||
golang.org/x/sync v0.3.0 // indirect
|
||||
golang.org/x/sys v0.15.0 // indirect
|
||||
golang.org/x/term v0.15.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
||||
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
|
||||
google.golang.org/api v0.128.0 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
|
||||
google.golang.org/grpc v1.56.3 // indirect
|
||||
google.golang.org/protobuf v1.30.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.25.4 // indirect
|
||||
k8s.io/apimachinery v0.27.2 // indirect
|
||||
k8s.io/klog/v2 v2.100.1 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect
|
||||
k8s.io/utils v0.0.0-20230505201702-9f6742963106 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
|
||||
sigs.k8s.io/yaml v1.3.0 // indirect
|
||||
)
|
||||
|
||||
replace (
|
||||
k8s.io/api => k8s.io/api v0.25.9
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.26.5
|
||||
k8s.io/client-go => k8s.io/client-go v0.25.9
|
||||
k8s.io/code-generator => k8s.io/code-generator v0.25.9
|
||||
k8s.io/kubernetes => k8s.io/kubernetes v1.11.1
|
||||
sigs.k8s.io/controller-tools => sigs.k8s.io/controller-tools v0.2.9
|
||||
)
|
||||
|
||||
go 1.19
|
File diff suppressed because it is too large
Load Diff
|
@ -1,43 +0,0 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
runInformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/customrun"
|
||||
customrun "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
|
||||
tkncontroller "github.com/tektoncd/pipeline/pkg/controller"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
configmap "knative.dev/pkg/configmap"
|
||||
controller "knative.dev/pkg/controller"
|
||||
logging "knative.dev/pkg/logging"
|
||||
)
|
||||
|
||||
const (
|
||||
ControllerName = "kfp-driver"
|
||||
apiVersion = "kfp-driver.tekton.dev/v1alpha1"
|
||||
kind = "KFPDriver"
|
||||
)
|
||||
|
||||
// NewController creates a Reconciler for Run and returns the result of NewImpl.
|
||||
func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
|
||||
logger := logging.FromContext(ctx)
|
||||
|
||||
runInformer := runInformer.Get(ctx)
|
||||
|
||||
r := &Reconciler{}
|
||||
|
||||
impl := customrun.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
|
||||
return controller.Options{
|
||||
AgentName: ControllerName,
|
||||
}
|
||||
})
|
||||
|
||||
logger.Info("Setting up event handlers")
|
||||
|
||||
runInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
|
||||
FilterFunc: tkncontroller.FilterCustomRunRef(apiVersion, kind),
|
||||
Handler: controller.HandleAll(impl.Enqueue),
|
||||
})
|
||||
|
||||
return impl
|
||||
}
|
|
@ -1,41 +0,0 @@
|
|||
--- klog.go 2023-02-14 13:31:28.209488578 -0800
|
||||
+++ vendor/k8s.io/klog/v2/klog.go 2023-02-14 13:33:28.081570621 -0800
|
||||
@@ -401,25 +401,25 @@
|
||||
|
||||
// init sets up the defaults and creates command line flags.
|
||||
func init() {
|
||||
- commandLine.StringVar(&logging.logDir, "log_dir", "", "If non-empty, write log files in this directory (no effect when -logtostderr=true)")
|
||||
- commandLine.StringVar(&logging.logFile, "log_file", "", "If non-empty, use this log file (no effect when -logtostderr=true)")
|
||||
- commandLine.Uint64Var(&logging.logFileMaxSizeMB, "log_file_max_size", 1800,
|
||||
+ commandLine.StringVar(&logging.logDir, "klog_dir", "", "If non-empty, write log files in this directory (no effect when -logtostderr=true)")
|
||||
+ commandLine.StringVar(&logging.logFile, "klog_file", "", "If non-empty, use this log file (no effect when -logtostderr=true)")
|
||||
+ commandLine.Uint64Var(&logging.logFileMaxSizeMB, "klog_file_max_size", 1800,
|
||||
"Defines the maximum size a log file can grow to (no effect when -logtostderr=true). Unit is megabytes. "+
|
||||
"If the value is 0, the maximum file size is unlimited.")
|
||||
- commandLine.BoolVar(&logging.toStderr, "logtostderr", true, "log to standard error instead of files")
|
||||
- commandLine.BoolVar(&logging.alsoToStderr, "alsologtostderr", false, "log to standard error as well as files (no effect when -logtostderr=true)")
|
||||
+ commandLine.BoolVar(&logging.toStderr, "klog_tostderr", true, "log to standard error instead of files")
|
||||
+ commandLine.BoolVar(&logging.alsoToStderr, "klog_alsologtostderr", false, "log to standard error as well as files (no effect when -logtostderr=true)")
|
||||
logging.setVState(0, nil, false)
|
||||
- commandLine.Var(&logging.verbosity, "v", "number for the log level verbosity")
|
||||
- commandLine.BoolVar(&logging.addDirHeader, "add_dir_header", false, "If true, adds the file directory to the header of the log messages")
|
||||
- commandLine.BoolVar(&logging.skipHeaders, "skip_headers", false, "If true, avoid header prefixes in the log messages")
|
||||
- commandLine.BoolVar(&logging.oneOutput, "one_output", false, "If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)")
|
||||
- commandLine.BoolVar(&logging.skipLogHeaders, "skip_log_headers", false, "If true, avoid headers when opening log files (no effect when -logtostderr=true)")
|
||||
+ commandLine.Var(&logging.verbosity, "klog_v", "number for the log level verbosity")
|
||||
+ commandLine.BoolVar(&logging.addDirHeader, "klog_add_dir_header", false, "If true, adds the file directory to the header of the log messages")
|
||||
+ commandLine.BoolVar(&logging.skipHeaders, "klog_skip_headers", false, "If true, avoid header prefixes in the log messages")
|
||||
+ commandLine.BoolVar(&logging.oneOutput, "klog_one_output", false, "If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)")
|
||||
+ commandLine.BoolVar(&logging.skipLogHeaders, "klog_skip_log_headers", false, "If true, avoid headers when opening log files (no effect when -logtostderr=true)")
|
||||
logging.stderrThreshold = severityValue{
|
||||
Severity: severity.ErrorLog, // Default stderrThreshold is ERROR.
|
||||
}
|
||||
- commandLine.Var(&logging.stderrThreshold, "stderrthreshold", "logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=false)")
|
||||
- commandLine.Var(&logging.vmodule, "vmodule", "comma-separated list of pattern=N settings for file-filtered logging")
|
||||
- commandLine.Var(&logging.traceLocation, "log_backtrace_at", "when logging hits line file:N, emit a stack trace")
|
||||
+ commandLine.Var(&logging.stderrThreshold, "klog_stderrthreshold", "logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=false)")
|
||||
+ commandLine.Var(&logging.vmodule, "klog_vmodule", "comma-separated list of pattern=N settings for file-filtered logging")
|
||||
+ commandLine.Var(&logging.traceLocation, "klog_backtrace_at", "when logging hits line file:N, emit a stack trace")
|
||||
|
||||
logging.settings.contextualLoggingEnabled = true
|
||||
logging.flushD = newFlushDaemon(logging.lockAndFlushAll, nil)
|
|
@ -1,323 +0,0 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
v1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
|
||||
"github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"knative.dev/pkg/apis"
|
||||
"knative.dev/pkg/logging"
|
||||
reconciler "knative.dev/pkg/reconciler"
|
||||
|
||||
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
|
||||
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
|
||||
"github.com/kubeflow/pipelines/backend/src/v2/driver"
|
||||
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
|
||||
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
|
||||
)
|
||||
|
||||
const (
|
||||
// ReasonFailedValidation indicates that the reason for failure status is that Run failed runtime validation
|
||||
ReasonFailedValidation = "RunValidationFailed"
|
||||
|
||||
// ReasonDriverError indicates that an error is throw while running the driver
|
||||
ReasonDriverError = "DriverError"
|
||||
|
||||
// ReasonDriverSuccess indicates that driver finished successfully
|
||||
ReasonDriverSuccess = "DriverSuccess"
|
||||
|
||||
ExecutionID = "execution-id"
|
||||
ExecutorInput = "executor-input"
|
||||
CacheDecision = "cached-decision"
|
||||
Condition = "condition"
|
||||
IterationCount = "iteration-count"
|
||||
PodSpecPatch = "pod-spec-patch"
|
||||
)
|
||||
|
||||
// newReconciledNormal makes a new reconciler event with event type Normal, and reason RunReconciled.
|
||||
func newReconciledNormal(namespace, name string) reconciler.Event {
|
||||
return reconciler.NewEvent(v1.EventTypeNormal, "RunReconciled", "Run reconciled: \"%s/%s\"", namespace, name)
|
||||
}
|
||||
|
||||
// Reconciler implements controller.Reconciler for Run resources.
|
||||
type Reconciler struct {
|
||||
}
|
||||
|
||||
type driverOptions struct {
|
||||
driverType string
|
||||
options driver.Options
|
||||
mlmdClient *metadata.Client
|
||||
cacheClient *cacheutils.Client
|
||||
}
|
||||
|
||||
// Check that our Reconciler implements Interface
|
||||
var _ customrun.Interface = (*Reconciler)(nil)
|
||||
|
||||
// ReconcileKind implements Interface.ReconcileKind.
|
||||
func (r *Reconciler) ReconcileKind(ctx context.Context, run *v1beta1.CustomRun) reconciler.Event {
|
||||
logger := logging.FromContext(ctx)
|
||||
logger.Infof("Reconciling Run %s/%s", run.Namespace, run.Name)
|
||||
|
||||
// If the Run has not started, initialize the Condition and set the start time.
|
||||
if !run.HasStarted() {
|
||||
logger.Infof("Starting new Run %s/%s", run.Namespace, run.Name)
|
||||
run.Status.InitializeConditions()
|
||||
// In case node time was not synchronized, when controller has been scheduled to other nodes.
|
||||
if run.Status.StartTime.Sub(run.CreationTimestamp.Time) < 0 {
|
||||
logger.Warnf("Run %s/%s createTimestamp %s is after the Run started %s", run.Namespace, run.Name, run.CreationTimestamp, run.Status.StartTime)
|
||||
run.Status.StartTime = &run.CreationTimestamp
|
||||
}
|
||||
}
|
||||
|
||||
if run.IsDone() {
|
||||
logger.Infof("Run %s/%s is done", run.Namespace, run.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
options, err := parseParams(run)
|
||||
if err != nil {
|
||||
logger.Errorf("Run %s/%s is invalid because of %s", run.Namespace, run.Name, err)
|
||||
run.Status.MarkCustomRunFailed(ReasonFailedValidation,
|
||||
"Run can't be run because it has an invalid param - %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
runResults, driverErr := execDriver(ctx, options)
|
||||
if driverErr != nil {
|
||||
logger.Errorf("kfp-driver execution failed when reconciling Run %s/%s: %v", run.Namespace, run.Name, driverErr)
|
||||
run.Status.MarkCustomRunFailed(ReasonDriverError,
|
||||
"kfp-driver execution failed: %v", driverErr)
|
||||
return nil
|
||||
}
|
||||
|
||||
run.Status.Results = append(run.Status.Results, *runResults...)
|
||||
run.Status.MarkCustomRunSucceeded(ReasonDriverSuccess, "kfp-driver finished successfully")
|
||||
return newReconciledNormal(run.Namespace, run.Name)
|
||||
}
|
||||
|
||||
func execDriver(ctx context.Context, options *driverOptions) (*[]v1beta1.CustomRunResult, error) {
|
||||
var execution *driver.Execution
|
||||
var err error
|
||||
logger := logging.FromContext(ctx)
|
||||
|
||||
switch options.driverType {
|
||||
case "ROOT_DAG":
|
||||
execution, err = driver.RootDAG(ctx, options.options, options.mlmdClient)
|
||||
case "CONTAINER":
|
||||
execution, err = driver.Container(ctx, options.options, options.mlmdClient, options.cacheClient)
|
||||
case "DAG":
|
||||
execution, err = driver.DAG(ctx, options.options, options.mlmdClient)
|
||||
case "DAG-PUB":
|
||||
// no-op for now
|
||||
return &[]v1beta1.CustomRunResult{}, nil
|
||||
default:
|
||||
err = fmt.Errorf("unknown driverType %s", options.driverType)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var runResults []v1beta1.CustomRunResult
|
||||
|
||||
if execution.ID != 0 {
|
||||
logger.Infof("output execution.ID=%v", execution.ID)
|
||||
runResults = append(runResults, v1beta1.CustomRunResult{
|
||||
Name: ExecutionID,
|
||||
Value: fmt.Sprint(execution.ID),
|
||||
})
|
||||
}
|
||||
if execution.IterationCount != nil {
|
||||
count := *execution.IterationCount
|
||||
// the count would be use as 'to' in PipelineLoop. since PipelineLoop's numberic iteration includes to,
|
||||
// need to substract 1 to compensate that.
|
||||
count = count - 1
|
||||
if count < 0 {
|
||||
count = 0
|
||||
}
|
||||
logger.Infof("output execution.IterationCount=%v, count:=%v", *execution.IterationCount, count)
|
||||
runResults = append(runResults, v1beta1.CustomRunResult{
|
||||
Name: IterationCount,
|
||||
Value: fmt.Sprint(count),
|
||||
})
|
||||
}
|
||||
|
||||
logger.Infof("output execution.Condition=%v", execution.Condition)
|
||||
if execution.Condition == nil {
|
||||
runResults = append(runResults, v1beta1.CustomRunResult{
|
||||
Name: Condition,
|
||||
Value: "",
|
||||
})
|
||||
} else {
|
||||
runResults = append(runResults, v1beta1.CustomRunResult{
|
||||
Name: Condition,
|
||||
Value: strconv.FormatBool(*execution.Condition),
|
||||
})
|
||||
}
|
||||
|
||||
if execution.ExecutorInput != nil {
|
||||
marshaler := jsonpb.Marshaler{}
|
||||
executorInputJSON, err := marshaler.MarshalToString(execution.ExecutorInput)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal ExecutorInput to JSON: %w", err)
|
||||
}
|
||||
logger.Infof("output ExecutorInput:%s\n", prettyPrint(executorInputJSON))
|
||||
runResults = append(runResults, v1beta1.CustomRunResult{
|
||||
Name: ExecutorInput,
|
||||
Value: fmt.Sprint(executorInputJSON),
|
||||
})
|
||||
}
|
||||
// seems no need to handle the PodSpecPatch
|
||||
if execution.Cached != nil {
|
||||
runResults = append(runResults, v1beta1.CustomRunResult{
|
||||
Name: CacheDecision,
|
||||
Value: strconv.FormatBool(*execution.Cached),
|
||||
})
|
||||
}
|
||||
|
||||
if options.driverType == "CONTAINER" {
|
||||
runResults = append(runResults, v1beta1.CustomRunResult{
|
||||
Name: PodSpecPatch,
|
||||
Value: execution.PodSpecPatch,
|
||||
})
|
||||
}
|
||||
|
||||
return &runResults, nil
|
||||
}
|
||||
|
||||
func prettyPrint(jsonStr string) string {
|
||||
var prettyJSON bytes.Buffer
|
||||
err := json.Indent(&prettyJSON, []byte(jsonStr), "", " ")
|
||||
if err != nil {
|
||||
return jsonStr
|
||||
}
|
||||
return prettyJSON.String()
|
||||
}
|
||||
|
||||
func parseParams(run *v1beta1.CustomRun) (*driverOptions, *apis.FieldError) {
|
||||
if len(run.Spec.Params) == 0 {
|
||||
return nil, apis.ErrMissingField("params")
|
||||
}
|
||||
|
||||
opts := &driverOptions{
|
||||
driverType: "",
|
||||
}
|
||||
opts.options.Namespace = run.Namespace
|
||||
mlmdOpts := metadata.ServerConfig{
|
||||
Address: "metadata-grpc-service.kubeflow.svc.cluster.local",
|
||||
Port: "8080",
|
||||
}
|
||||
|
||||
for _, param := range run.Spec.Params {
|
||||
switch param.Name {
|
||||
case "type":
|
||||
opts.driverType = param.Value.StringVal
|
||||
case "pipeline_name":
|
||||
opts.options.PipelineName = param.Value.StringVal
|
||||
case "run_id":
|
||||
opts.options.RunID = param.Value.StringVal
|
||||
case "component":
|
||||
if param.Value.StringVal != "" {
|
||||
componentSpec := &pipelinespec.ComponentSpec{}
|
||||
if err := jsonpb.UnmarshalString(param.Value.StringVal, componentSpec); err != nil {
|
||||
return nil, apis.ErrInvalidValue(
|
||||
fmt.Sprintf("failed to unmarshal component spec, error: %v\ncomponentSpec: %v", err, param.Value.StringVal),
|
||||
"component",
|
||||
)
|
||||
}
|
||||
opts.options.Component = componentSpec
|
||||
}
|
||||
case "task":
|
||||
if param.Value.StringVal != "" {
|
||||
taskSpec := &pipelinespec.PipelineTaskSpec{}
|
||||
if err := jsonpb.UnmarshalString(param.Value.StringVal, taskSpec); err != nil {
|
||||
return nil, apis.ErrInvalidValue(
|
||||
fmt.Sprintf("failed to unmarshal task spec, error: %v\ntask: %v", err, param.Value.StringVal),
|
||||
"task",
|
||||
)
|
||||
}
|
||||
opts.options.Task = taskSpec
|
||||
}
|
||||
case "runtime_config":
|
||||
if param.Value.StringVal != "" {
|
||||
runtimeConfig := &pipelinespec.PipelineJob_RuntimeConfig{}
|
||||
if err := jsonpb.UnmarshalString(param.Value.StringVal, runtimeConfig); err != nil {
|
||||
return nil, apis.ErrInvalidValue(
|
||||
fmt.Sprintf("failed to unmarshal runtime config, error: %v\nruntimeConfig: %v", err, param.Value.StringVal),
|
||||
"runtime-config",
|
||||
)
|
||||
}
|
||||
opts.options.RuntimeConfig = runtimeConfig
|
||||
}
|
||||
case "container":
|
||||
if param.Value.StringVal != "" {
|
||||
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{}
|
||||
if err := jsonpb.UnmarshalString(param.Value.StringVal, containerSpec); err != nil {
|
||||
return nil, apis.ErrInvalidValue(
|
||||
fmt.Sprintf("failed to unmarshal container spec, error: %v\ncontainerSpec: %v", err, param.Value.StringVal),
|
||||
"container",
|
||||
)
|
||||
}
|
||||
opts.options.Container = containerSpec
|
||||
}
|
||||
case "iteration_index":
|
||||
if param.Value.StringVal != "" {
|
||||
tmp, _ := strconv.ParseInt(param.Value.StringVal, 10, 32)
|
||||
opts.options.IterationIndex = int(tmp)
|
||||
} else {
|
||||
opts.options.IterationIndex = -1
|
||||
}
|
||||
case "dag_execution_id":
|
||||
if param.Value.StringVal != "" {
|
||||
opts.options.DAGExecutionID, _ = strconv.ParseInt(param.Value.StringVal, 10, 64)
|
||||
}
|
||||
case "mlmd_server_address":
|
||||
mlmdOpts.Address = param.Value.StringVal
|
||||
case "mlmd_server_port":
|
||||
mlmdOpts.Port = param.Value.StringVal
|
||||
case "kubernetes_config":
|
||||
var k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
|
||||
if param.Value.StringVal != "" {
|
||||
k8sExecCfg = &kubernetesplatform.KubernetesExecutorConfig{}
|
||||
if err := jsonpb.UnmarshalString(param.Value.StringVal, k8sExecCfg); err != nil {
|
||||
return nil, apis.ErrInvalidValue(
|
||||
fmt.Sprintf("failed to unmarshal Kubernetes config, error: %v\nKubernetesConfig: %v", err, param.Value.StringVal),
|
||||
"kubernetes_config",
|
||||
)
|
||||
}
|
||||
opts.options.KubernetesExecutorConfig = k8sExecCfg
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Check all options
|
||||
if opts.driverType == "" {
|
||||
return nil, apis.ErrMissingField("type")
|
||||
}
|
||||
|
||||
if opts.options.RunID == "" {
|
||||
return nil, apis.ErrMissingField("run-id")
|
||||
}
|
||||
|
||||
if opts.driverType == "ROOT_DAG" && opts.options.RuntimeConfig == nil {
|
||||
return nil, apis.ErrMissingField("runtime-config")
|
||||
}
|
||||
|
||||
client, err := metadata.NewClient(mlmdOpts.Address, mlmdOpts.Port)
|
||||
if err != nil {
|
||||
return nil, apis.ErrGeneric(fmt.Sprintf("can't estibalish MLMD connection: %v", err))
|
||||
}
|
||||
opts.mlmdClient = client
|
||||
cacheClient, err := cacheutils.NewClient()
|
||||
if err != nil {
|
||||
return nil, apis.ErrGeneric(fmt.Sprintf("can't estibalish cache connection: %v", err))
|
||||
}
|
||||
opts.cacheClient = cacheClient
|
||||
return opts, nil
|
||||
}
|
|
@ -1 +1 @@
|
|||
2.0.3
|
||||
2.1.0
|
||||
|
|
|
@ -1 +1 @@
|
|||
2.0.3
|
||||
2.1.0
|
||||
|
|
|
@ -3,15 +3,15 @@ module github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask
|
|||
require (
|
||||
github.com/golang/protobuf v1.5.3
|
||||
github.com/google/uuid v1.3.1
|
||||
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03
|
||||
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03
|
||||
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03
|
||||
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20231027040853-58ce09e07d03
|
||||
github.com/kubeflow/pipelines v0.0.0-20240416215826-da804407ad31
|
||||
github.com/kubeflow/pipelines/api v0.0.0-20240416215826-da804407ad31
|
||||
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240416215826-da804407ad31
|
||||
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20240416215826-da804407ad31
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/tektoncd/pipeline v0.53.2
|
||||
go.uber.org/zap v1.26.0
|
||||
google.golang.org/protobuf v1.31.0
|
||||
k8s.io/api v0.27.1
|
||||
k8s.io/api v0.27.2
|
||||
k8s.io/apimachinery v0.27.3
|
||||
k8s.io/client-go v0.27.2
|
||||
k8s.io/utils v0.0.0-20230505201702-9f6742963106
|
||||
|
@ -27,7 +27,7 @@ require (
|
|||
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
|
||||
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
|
||||
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
|
||||
github.com/aws/aws-sdk-go v1.42.50 // indirect
|
||||
github.com/aws/aws-sdk-go v1.45.25 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/blang/semver/v4 v4.0.0 // indirect
|
||||
github.com/blendle/zapdriver v1.3.1 // indirect
|
||||
|
@ -37,7 +37,7 @@ require (
|
|||
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
|
||||
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
|
||||
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
|
||||
github.com/go-kit/log v0.2.0 // indirect
|
||||
github.com/go-kit/log v0.2.1 // indirect
|
||||
github.com/go-logfmt/logfmt v0.5.1 // indirect
|
||||
github.com/go-logr/logr v1.2.4 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.6 // indirect
|
||||
|
@ -45,7 +45,7 @@ require (
|
|||
github.com/go-openapi/swag v0.22.3 // indirect
|
||||
github.com/gobuffalo/flect v0.2.4 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/glog v1.1.0 // indirect
|
||||
github.com/golang/glog v1.2.0 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/google/cel-go v0.12.6 // indirect
|
||||
github.com/google/gnostic v0.6.9 // indirect
|
||||
|
@ -75,11 +75,12 @@ require (
|
|||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_golang v1.14.0 // indirect
|
||||
github.com/prometheus/client_golang v1.16.0 // indirect
|
||||
github.com/prometheus/client_model v0.4.0 // indirect
|
||||
github.com/prometheus/common v0.37.0 // indirect
|
||||
github.com/prometheus/procfs v0.8.0 // indirect
|
||||
github.com/prometheus/common v0.42.0 // indirect
|
||||
github.com/prometheus/procfs v0.10.1 // indirect
|
||||
github.com/prometheus/statsd_exporter v0.21.0 // indirect
|
||||
github.com/rogpeppe/go-internal v1.12.0 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/stoewer/go-strcase v1.2.0 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
|
@ -89,7 +90,7 @@ require (
|
|||
gocloud.dev v0.22.0 // indirect
|
||||
golang.org/x/crypto v0.17.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/net v0.19.0 // indirect
|
||||
golang.org/x/oauth2 v0.13.0 // indirect
|
||||
golang.org/x/sync v0.4.0 // indirect
|
||||
golang.org/x/sys v0.15.0 // indirect
|
||||
|
@ -107,7 +108,7 @@ require (
|
|||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.26.5 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.27.2 // indirect
|
||||
k8s.io/klog/v2 v2.100.1 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -295,6 +295,10 @@ func ParseParams(run *tektonv1beta1.CustomRun) (*driverOptions, *apis.FieldError
|
|||
return opts, nil
|
||||
}
|
||||
|
||||
func GetKubernetesExecutorConfig(options *driverOptions) *kubernetesplatform.KubernetesExecutorConfig {
|
||||
return options.options.KubernetesExecutorConfig
|
||||
}
|
||||
|
||||
func prettyPrint(jsonStr string) string {
|
||||
var prettyJSON bytes.Buffer
|
||||
err := json.Indent(&prettyJSON, []byte(jsonStr), "", " ")
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
kfptaskClient "github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/client/clientset/versioned"
|
||||
kfptaskListers "github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/client/listers/kfptask/v1alpha1"
|
||||
"github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/common"
|
||||
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
|
||||
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
|
||||
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
|
||||
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
|
||||
|
@ -135,13 +136,13 @@ var annotationToDrop = map[string]string{
|
|||
}
|
||||
|
||||
// transite to next state based on current state
|
||||
func (kts *kfptaskFS) next(executionID string, executorInput string, podSpecPatch string) error {
|
||||
func (kts *kfptaskFS) next(executionID string, executorInput string, podSpecPatch string, executorConfig *kubernetesplatform.KubernetesExecutorConfig) error {
|
||||
kts.logger.Infof("kts state is %s", kts.state)
|
||||
switch kts.state {
|
||||
case StateInit:
|
||||
// create the corresponding TaskRun CRD and start the task
|
||||
// compose TaskRun
|
||||
tr, err := kts.constructTaskRun(executionID, executorInput, podSpecPatch)
|
||||
tr, err := kts.constructTaskRun(executionID, executorInput, podSpecPatch, executorConfig)
|
||||
if err != nil {
|
||||
kts.logger.Infof("Failed to construct a TaskRun:%v", err)
|
||||
kts.run.Status.MarkCustomRunFailed(kfptaskv1alpha1.KfpTaskRunReasonInternalError.String(), "Failed to construct a TaskRun: %v", err)
|
||||
|
@ -196,7 +197,47 @@ func (kts *kfptaskFS) next(executionID string, executorInput string, podSpecPatc
|
|||
return nil
|
||||
}
|
||||
|
||||
func (kts *kfptaskFS) constructTaskRun(executionID string, executorInput string, podSpecPatch string) (*tektonv1.TaskRun, error) {
|
||||
// Extends the PodMetadata to include Kubernetes-specific executor config.
|
||||
// Although the current podMetadata object is always empty, this function
|
||||
// doesn't overwrite the existing podMetadata because for security reasons
|
||||
// the existing podMetadata should have higher privilege than the user definition.
|
||||
func extendPodMetadata(
|
||||
podMetadata *metav1.ObjectMeta,
|
||||
kubernetesExecutorConfig *kubernetesplatform.KubernetesExecutorConfig,
|
||||
) {
|
||||
// Get pod metadata information
|
||||
if kubernetesExecutorConfig.GetPodMetadata() != nil {
|
||||
if kubernetesExecutorConfig.GetPodMetadata().GetLabels() != nil {
|
||||
if podMetadata.Labels == nil {
|
||||
podMetadata.Labels = kubernetesExecutorConfig.GetPodMetadata().GetLabels()
|
||||
} else {
|
||||
podMetadata.Labels = extendMetadataMap(podMetadata.Labels, kubernetesExecutorConfig.GetPodMetadata().GetLabels())
|
||||
}
|
||||
}
|
||||
if kubernetesExecutorConfig.GetPodMetadata().GetAnnotations() != nil {
|
||||
if podMetadata.Annotations == nil {
|
||||
podMetadata.Annotations = kubernetesExecutorConfig.GetPodMetadata().GetAnnotations()
|
||||
} else {
|
||||
podMetadata.Annotations = extendMetadataMap(podMetadata.Annotations, kubernetesExecutorConfig.GetPodMetadata().GetAnnotations())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extends metadata map values, highPriorityMap should overwrites lowPriorityMap values
|
||||
// The original Map inputs should have higher priority since its defined by admin
|
||||
// TODO: Use maps.Copy after moving to go 1.21+
|
||||
func extendMetadataMap(
|
||||
highPriorityMap map[string]string,
|
||||
lowPriorityMap map[string]string,
|
||||
) map[string]string {
|
||||
for k, v := range highPriorityMap {
|
||||
lowPriorityMap[k] = v
|
||||
}
|
||||
return lowPriorityMap
|
||||
}
|
||||
|
||||
func (kts *kfptaskFS) constructTaskRun(executionID string, executorInput string, podSpecPatch string, executorConfig *kubernetesplatform.KubernetesExecutorConfig) (*tektonv1.TaskRun, error) {
|
||||
ktSpec, err := kts.reconciler.getKfpTaskSpec(kts.ctx, kts.run)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -235,6 +276,10 @@ func (kts *kfptaskFS) constructTaskRun(executionID string, executorInput string,
|
|||
},
|
||||
}
|
||||
|
||||
if executorConfig != nil {
|
||||
extendPodMetadata(&tr.ObjectMeta, executorConfig)
|
||||
}
|
||||
|
||||
if podSpecPatch != "" {
|
||||
podSpec, err := parseTaskSpecPatch(podSpecPatch)
|
||||
if err != nil {
|
||||
|
@ -312,7 +357,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, run *tektonv1beta1.Custo
|
|||
return nil
|
||||
}
|
||||
if ktstate.isRunning() {
|
||||
return ktstate.next("", "", "")
|
||||
return ktstate.next("", "", "", nil)
|
||||
}
|
||||
options, err := common.ParseParams(run)
|
||||
if err != nil {
|
||||
|
@ -321,6 +366,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, run *tektonv1beta1.Custo
|
|||
"Run can't be run because it has an invalid param - %v", err)
|
||||
return nil
|
||||
}
|
||||
executorConfig := common.GetKubernetesExecutorConfig(options)
|
||||
|
||||
runResults, runTask, executionID, executorInput, podSpecPatch, driverErr := common.ExecDriver(ctx, options)
|
||||
if driverErr != nil {
|
||||
|
@ -341,7 +387,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, run *tektonv1beta1.Custo
|
|||
return nil
|
||||
}
|
||||
|
||||
return ktstate.next(executionID, executorInput, podSpecPatch)
|
||||
return ktstate.next(executionID, executorInput, podSpecPatch, executorConfig)
|
||||
}
|
||||
|
||||
func (r *Reconciler) FinalizeKind(ctx context.Context, run *tektonv1beta1.CustomRun) reconciler.Event {
|
||||
|
|
Loading…
Reference in New Issue