Compare commits

...

11 Commits

Author SHA1 Message Date
Tommy Li 0b89419544
chore(owners): Promote rafalbigaj as the approver (#1484)
Signed-off-by: tomcli <tommy.chaoping.li@ibm.com>
2024-04-17 22:13:39 +00:00
Tommy Li 3e7950ffd3
chore(deps): sync kfp deps with the latest commit (#1485)
Signed-off-by: tomcli <tommy.chaoping.li@ibm.com>
2024-04-16 23:32:26 +00:00
Tommy Li bb47bcd892
chore(ci): Add clean up step for tekton ci (#1480) 2024-03-27 17:36:21 +00:00
Tommy Li c0d25310d5
chore(kfp-task): Update driver package to 2.1.0 release (#1478) 2024-03-27 00:11:19 +00:00
Tommy Li b49f959db9
chore(tekton-driver): Update tekton v2 driver to support the latest k8s spec from upstream (#1464)
Signed-off-by: Tommy Li <Tommy.chaoping.li@ibm.com>
2024-03-06 03:38:07 +00:00
Helber Belmiro 16e781dce9
fix(docs): Updated legal info due to migration from CLA to DCO (#1463)
* Updated legal info due to migration from CLA to DCO

Signed-off-by: hbelmiro <helber.belmiro@gmail.com>

* Fixed TOC

Signed-off-by: hbelmiro <helber.belmiro@gmail.com>

---------

Signed-off-by: hbelmiro <helber.belmiro@gmail.com>
2024-03-05 17:18:07 +00:00
Tommy Li 803377e899
chore(sdk): Add sdk 1.9.3 release (#1462)
Signed-off-by: Tommy Li <Tommy.chaoping.li@ibm.com>
2024-02-27 23:12:59 +00:00
Tommy Li db6d85ece6
feat(sdk): add verify_ssl flag to support self cert (#1461)
Signed-off-by: Tommy Li <Tommy.chaoping.li@ibm.com>
2024-02-27 22:52:59 +00:00
Tommy Li 9f568f2a72
feat(ci): Update github actions to also test python 3.12 (#1456)
* Update github actions to also test python 3.12

* Update setup.py

* Update setup.py

* Update kfp-tekton-unittests.yml

* Update kfp-tekton-unittests.yml

* Update kfp-tekton-unittests.yml

* Update kfp-tekton-unittests.yml

* Update README.md
2024-02-13 18:35:05 +00:00
Tommy Li a9d7df96d2
fix(README): Update instructions to only use kustomize (#1455) 2024-02-13 00:07:04 +00:00
Tommy Li b77e6f38d5
Chore(docs) Update kfp_tekton_install V2 instructions for KFP-Tekton 2.0.5 release (#1453) 2024-01-18 17:35:59 +00:00
24 changed files with 194 additions and 4536 deletions

View File

@ -9,14 +9,13 @@ on:
env:
GITHUB_ACTION: "true"
SETUPTOOLS_USE_DISTUTILS: "stdlib"
jobs:
python-unittest:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11']
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12']
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}

View File

@ -66,6 +66,7 @@ spec:
command: ["/bin/bash", "-c"]
args:
- set -ex;
rm -r /artifacts/*;
cd /artifacts && git clone -q -b $GIT_BRANCH $GIT_URL .;
GIT_COMMIT=$(git rev-parse HEAD);
source ./scripts/deploy/iks/run-test.sh;

View File

@ -9,7 +9,7 @@ just a few small guidelines you need to follow.
<!-- START of ToC generated by running ./tools/mdtoc.sh CONTRIBUTING.md -->
- [Project Structure](#project-structure)
- [Contributor License Agreement](#contributor-license-agreement)
- [Legal](#legal)
- [Coding Style](#coding-style)
- [Unit Testing Best Practices](#unit-testing-best-practices)
- [Golang](#golang)
@ -35,17 +35,11 @@ To get started, see the development guides:
* [Backend development guide](./backend/README.md)
* [SDK development guide](./sdk/python/README.md)
## Contributor License Agreement
## Legal
Contributions to this project must be accompanied by a Contributor License
Agreement. You (or your employer) retain the copyright to your contribution;
this simply gives us permission to use and redistribute your contributions as
part of the project. Head over to <https://cla.developers.google.com/> to see
your current agreements on file or to sign a new one.
Kubeflow uses Developer Certificate of Origin ([DCO](https://github.com/apps/dco/)).
You generally only need to submit a CLA once, so if you've already submitted one
(even if it was for a different project), you probably don't need to do it
again.
Please see https://github.com/kubeflow/community/tree/master/dco-signoff-hook#signing-off-commits to learn how to sign off your commits.
## Coding Style

1
OWNERS
View File

@ -6,6 +6,7 @@ approvers:
- pugangxa
- scrapcodes
- yhwang
- rafalbigaj
reviewers:
- ckadner
- Tomcli

View File

@ -73,17 +73,12 @@ To install the standalone Kubeflow Pipelines V1 with Tekton , run the following
-p '{"data":{"default-timeout-minutes": "0"}}'
```
3. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v1.9.2` [custom resource definitions](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)(CRDs).
3. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v1.9.2` deployment
```shell
kubectl apply --selector kubeflow/crd-install=true -f https://raw.githubusercontent.com/kubeflow/kfp-tekton/master/install/v1.9.2/kfp-tekton.yaml
kubectl apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/env/kfp-template\?ref\=v1.9.2
```
4. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v1.9.2` deployment
```shell
kubectl apply -f https://raw.githubusercontent.com/kubeflow/kfp-tekton/master/install/v1.9.2/kfp-tekton.yaml
```
5. Then, if you want to expose the Kubeflow Pipelines endpoint outside the cluster, run the following commands:
4. Then, if you want to expose the Kubeflow Pipelines endpoint outside the cluster, run the following commands:
```shell
kubectl patch svc ml-pipeline-ui -n kubeflow -p '{"spec": {"type": "LoadBalancer"}}'
```
@ -93,13 +88,13 @@ To install the standalone Kubeflow Pipelines V1 with Tekton , run the following
kubectl get svc ml-pipeline-ui -n kubeflow -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
```
6. (GPU worker nodes only) If your Kubernetes cluster has a mixture of CPU and GPU worker nodes, it's recommended to disable the Tekton default affinity assistant so that Tekton won't schedule too many CPU workloads on the GPU nodes.
5. (GPU worker nodes only) If your Kubernetes cluster has a mixture of CPU and GPU worker nodes, it's recommended to disable the Tekton default affinity assistant so that Tekton won't schedule too many CPU workloads on the GPU nodes.
```shell
kubectl patch cm feature-flags -n tekton-pipelines \
-p '{"data":{"disable-affinity-assistant": "true"}}'
```
7. (OpenShift only) If you are running the standalone KFP-Tekton on OpenShift, apply the necessary security context constraint below
6. (OpenShift only) If you are running the standalone KFP-Tekton on OpenShift, apply the necessary security context constraint below
```shell
curl -L https://raw.githubusercontent.com/kubeflow/kfp-tekton/master/install/v1.9.2/kfp-tekton.yaml | yq 'del(.spec.template.spec.containers[].securityContext.runAsUser, .spec.template.spec.containers[].securityContext.runAsGroup)' | oc apply -f -
oc apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/third-party/openshift/standalone
@ -111,9 +106,9 @@ To install the standalone Kubeflow Pipelines V1 with Tekton , run the following
To install the standalone Kubeflow Pipelines V2 with Tekton, run the following steps:
1. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v2.0.3` along with Tekton `v0.47.5`
1. Install Kubeflow Pipelines with Tekton backend (`kfp-tekton`) `v2.0.5` along with Tekton `v0.53.2`
```shell
kubectl apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/env/platform-agnostic-tekton\?ref\=v2.0.3
kubectl apply -k https://github.com/kubeflow/kfp-tekton//manifests/kustomize/env/platform-agnostic-tekton\?ref\=v2.0.5
```
2. Then, if you want to expose the Kubeflow Pipelines endpoint outside the cluster, run the following commands:
@ -135,7 +130,7 @@ To install the standalone Kubeflow Pipelines V2 with Tekton, run the following s
Now, please use the [KFP V2 Python SDK](https://pypi.org/project/kfp/) to compile KFP-Tekton V2 pipelines because we are sharing the same pipeline spec starting from KFP V2.0.0.
```shell
pip install "kfp>=2.0" "kfp-kubernetes>=1.0.0"
pip install "kfp>=2.6.0" "kfp-kubernetes>=1.1.0"
```
## Standalone Kubeflow Pipelines with Openshift Pipelines Backend Deployment

View File

@ -61,7 +61,7 @@ adding the `TektonCompiler` and the `TektonClient`:
## Project Prerequisites
- Python: `3.8` or later
- Python: `3.8` or later. For Python 3.12, make sure to not have the `SETUPTOOLS_USE_DISTUTILS` flag because it's already [deprecated](https://github.com/pypa/setuptools/issues/4002).
- Tekton: [`v0.53.2`](https://github.com/tektoncd/pipeline/releases/tag/v0.53.2) or [later](https://github.com/tektoncd/pipeline/releases/latest)
- Tekton CLI: [`0.30.1`](https://github.com/tektoncd/cli/releases/tag/v0.30.1)
- Kubeflow Pipelines: [KFP with Tekton backend](/guides/kfp_tekton_install.md)

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = '1.9.2'
__version__ = '1.9.3'
from ._client import TektonClient # noqa F401
from .k8s_client_helper import env_from_secret # noqa F401

View File

@ -16,6 +16,7 @@
import os
import tempfile
import time
import warnings
import datetime
from typing import Mapping, Callable, Optional
@ -27,6 +28,7 @@ from kfp_tekton_server_api import ApiException
from .compiler import TektonCompiler
from .compiler.pipeline_utils import TektonPipelineConf
from kfp._auth import get_auth_token, get_gcp_access_token
import json
import logging
@ -106,7 +108,8 @@ class TektonClient(kfp.Client):
ssl_ca_cert=None,
kube_context=None,
credentials=None,
ui_host=None):
ui_host=None,
verify_ssl=None):
"""Create a new instance of kfp client."""
host = host or os.environ.get(KF_PIPELINES_ENDPOINT_ENV)
self._uihost = os.environ.get(KF_PIPELINES_UI_ENDPOINT_ENV, ui_host or
@ -120,7 +123,7 @@ class TektonClient(kfp.Client):
config = self._load_config(host, client_id, namespace, other_client_id,
other_client_secret, existing_token, proxy,
ssl_ca_cert, kube_context, credentials)
ssl_ca_cert, kube_context, credentials, verify_ssl)
# Save the loaded API client configuration, as a reference if update is
# needed.
self._load_context_setting_or_default()
@ -162,7 +165,106 @@ class TektonClient(kfp.Client):
except FileNotFoundError:
logging.info(
'Failed to automatically set namespace.', exc_info=False)
def _load_config(self, host, client_id, namespace, other_client_id,
other_client_secret, existing_token, proxy, ssl_ca_cert,
kube_context, credentials, verify_ssl):
config = kfp_server_api.configuration.Configuration()
if proxy:
# https://github.com/kubeflow/pipelines/blob/c6ac5e0b1fd991e19e96419f0f508ec0a4217c29/backend/api/python_http_client/kfp_server_api/rest.py#L100
config.proxy = proxy
if verify_ssl is not None:
config.verify_ssl = verify_ssl
if ssl_ca_cert:
config.ssl_ca_cert = ssl_ca_cert
host = host or ''
# Defaults to 'https' if host does not contain 'http' or 'https' protocol.
if host and not host.startswith('http'):
warnings.warn(
'The host %s does not contain the "http" or "https" protocol.'
' Defaults to "https".' % host)
host = 'https://' + host
# Preprocess the host endpoint to prevent some common user mistakes.
if not client_id:
# always preserving the protocol (http://localhost requires it)
host = host.rstrip('/')
if host:
config.host = host
token = None
# "existing_token" is designed to accept token generated outside of SDK. Here is an example.
#
# https://cloud.google.com/functions/docs/securing/function-identity
# https://cloud.google.com/endpoints/docs/grpc/service-account-authentication
#
# import requests
# import kfp
#
# def get_access_token():
# url = 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token'
# r = requests.get(url, headers={'Metadata-Flavor': 'Google'})
# r.raise_for_status()
# access_token = r.json()['access_token']
# return access_token
#
# client = kfp.Client(host='<KFPHost>', existing_token=get_access_token())
#
if existing_token:
token = existing_token
self._is_refresh_token = False
elif client_id:
token = get_auth_token(client_id, other_client_id,
other_client_secret)
self._is_refresh_token = True
elif self._is_inverse_proxy_host(host):
token = get_gcp_access_token()
self._is_refresh_token = False
elif credentials:
config.api_key['authorization'] = 'placeholder'
config.api_key_prefix['authorization'] = 'Bearer'
config.refresh_api_key_hook = credentials.refresh_api_key_hook
if token:
config.api_key['authorization'] = token
config.api_key_prefix['authorization'] = 'Bearer'
return config
if host:
# if host is explicitly set with auth token, it's probably a port forward address.
return config
import kubernetes as k8s
in_cluster = True
try:
k8s.config.load_incluster_config()
except:
in_cluster = False
pass
if in_cluster:
config.host = TektonClient.IN_CLUSTER_DNS_NAME.format(namespace)
config = self._get_config_with_default_credentials(config)
return config
try:
k8s.config.load_kube_config(
client_configuration=config, context=kube_context)
except:
print('Failed to load kube config.')
return config
if config.host:
config.host = config.host + '/' + TektonClient.KUBE_PROXY_PATH.format(
namespace)
return config
def wait_for_run_completion(self, run_id: str, timeout: int):
"""Waits for a run to complete.

View File

@ -20,14 +20,14 @@
#
# To create a distribution for PyPi run:
#
# $ export KFP_TEKTON_VERSION=1.9.2-rc1
# $ export KFP_TEKTON_VERSION=1.9.3-rc1
# $ python3 setup.py sdist
# $ twine check dist/kfp-tekton-${KFP_TEKTON_VERSION/-rc/rc}.tar.gz
# $ twine upload --repository pypi dist/kfp-tekton-${KFP_TEKTON_VERSION/-rc/rc}.tar.gz
#
# ... or:
#
# $ make distribution KFP_TEKTON_VERSION=1.9.2-rc1
# $ make distribution KFP_TEKTON_VERSION=1.9.3-rc1
#
# =============================================================================
@ -61,7 +61,8 @@ logger.setLevel(logging.INFO)
REQUIRES = [
"kfp>=1.8.10,<1.8.23",
"kfp-tekton-server-api==1.8.0rc8",
"PyYAML>=6,<7"
"PyYAML>=6,<7",
"setuptools>=69.1"
]
TESTS_REQUIRE = [

View File

@ -1 +1 @@
1.8.1
1.9.2

View File

@ -8,12 +8,12 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/kubeflow/kfp-tekton/tekton-catalog/cache v0.0.0
github.com/kubeflow/kfp-tekton/tekton-catalog/objectstore v0.0.0
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-00010101000000-000000000000
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-20231127195001-a75d4b3711ff
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20240416215826-da804407ad31
github.com/tektoncd/pipeline v0.53.2
go.uber.org/zap v1.26.0
gomodules.xyz/jsonpatch/v2 v2.4.0
k8s.io/api v0.27.1
k8s.io/api v0.27.2
k8s.io/apimachinery v0.27.3
k8s.io/client-go v0.27.2
k8s.io/utils v0.0.0-20230505201702-9f6742963106
@ -30,7 +30,7 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
github.com/IBM/ibm-cos-sdk-go v1.8.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
github.com/aws/aws-sdk-go v1.42.50 // indirect
github.com/aws/aws-sdk-go v1.45.25 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
@ -41,16 +41,16 @@ require (
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/gobuffalo/flect v0.2.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.1.0 // indirect
github.com/golang/glog v1.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/cel-go v0.12.6 // indirect
@ -74,21 +74,21 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03 // indirect
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03 // indirect
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03 // indirect
github.com/kubeflow/pipelines v0.0.0-20240416215826-da804407ad31 // indirect
github.com/kubeflow/pipelines/api v0.0.0-20240416215826-da804407ad31 // indirect
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240416215826-da804407ad31 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-sqlite3 v1.14.16 // indirect
github.com/mattn/go-sqlite3 v1.14.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
@ -100,7 +100,7 @@ require (
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.15.0 // indirect
@ -122,8 +122,8 @@ require (
gorm.io/driver/mysql v1.4.3 // indirect
gorm.io/driver/sqlite v1.4.2 // indirect
gorm.io/gorm v1.24.0 // indirect
k8s.io/apiextensions-apiserver v0.26.5 // indirect
k8s.io/code-generator v0.26.5 // indirect
k8s.io/apiextensions-apiserver v0.27.2 // indirect
k8s.io/code-generator v0.27.2 // indirect
k8s.io/gengo v0.0.0-20221011193443-fad74ee6edd9 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect

View File

@ -1,34 +0,0 @@
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM golang:1.20.4-alpine3.17 as builder
WORKDIR /go/src/github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-driver
COPY . .
# Needed musl-dev for github.com/mattn/go-sqlite3
RUN apk update && apk upgrade && \
apk add --no-cache bash git openssh gcc musl-dev patch
RUN go mod vendor && patch -u vendor/k8s.io/klog/v2/klog.go pkg/controller/klog.patch
RUN CGO_ENABLED=0 GO111MODULE=on go build -mod=vendor -o /bin/controller cmd/controller/*.go && rm -rf vendor
FROM alpine:3.17
WORKDIR /bin
COPY --from=builder /bin/controller /bin/controller
RUN chmod +x /bin/controller
RUN apk --no-cache add tzdata
CMD /bin/controller

View File

@ -1 +0,0 @@
2.0.3

View File

@ -1,26 +0,0 @@
/*
// Copyright 2023 kubeflow.org
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
*/
package main
import (
"github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-driver/pkg/controller"
"knative.dev/pkg/injection/sharedmain"
)
func main() {
sharedmain.Main(controller.ControllerName, controller.NewController)
}

View File

@ -1,260 +0,0 @@
// Copyright 2021-2023 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
"github.com/kubeflow/pipelines/backend/src/v2/driver"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
"github.com/golang/glog"
"github.com/golang/protobuf/jsonpb"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/config"
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
)
const (
driverTypeArg = "type"
)
var (
// inputs
driverType = flag.String(driverTypeArg, "", "task driver type, one of ROOT_DAG, DAG, CONTAINER")
pipelineName = flag.String("pipeline_name", "", "pipeline context name")
runID = flag.String("run_id", "", "pipeline run uid")
componentSpecJson = flag.String("component", "{}", "component spec")
taskSpecJson = flag.String("task", "", "task spec")
runtimeConfigJson = flag.String("runtime_config", "", "jobruntime config")
iterationIndex = flag.Int("iteration_index", -1, "iteration index, -1 means not an interation")
// container inputs
dagExecutionID = flag.Int64("dag_execution_id", 0, "DAG execution ID")
containerSpecJson = flag.String("container", "{}", "container spec")
k8sExecConfigJson = flag.String("kubernetes_config", "{}", "kubernetes executor config")
// config
mlmdServerAddress = flag.String("mlmd_server_address", "", "MLMD server address")
mlmdServerPort = flag.String("mlmd_server_port", "", "MLMD server port")
// output paths
executionIDPath = flag.String("execution_id_path", "", "Exeucution ID output path")
iterationCountPath = flag.String("iteration_count_path", "", "Iteration Count output path")
podSpecPatchPath = flag.String("pod_spec_patch_path", "", "Pod Spec Patch output path")
// the value stored in the paths will be either 'true' or 'false'
cachedDecisionPath = flag.String("cached_decision_path", "", "Cached Decision output path")
conditionPath = flag.String("condition_path", "", "Condition output path")
)
// func RootDAG(pipelineName string, runID string, component *pipelinespec.ComponentSpec, task *pipelinespec.PipelineTaskSpec, mlmd *metadata.Client) (*Execution, error) {
func main() {
flag.Parse()
err := drive()
if err != nil {
glog.Exitf("%v", err)
}
}
// Use WARNING default logging level to facilitate troubleshooting.
func init() {
flag.Set("logtostderr", "true")
// Change the WARNING to INFO level for debugging.
flag.Set("stderrthreshold", "WARNING")
}
func validate() error {
if *driverType == "" {
return fmt.Errorf("argument --%s must be specified", driverTypeArg)
}
// validation responsibility lives in driver itself, so we do not validate all other args
return nil
}
func drive() (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("KFP driver: %w", err)
}
}()
ctx := context.Background()
if err = validate(); err != nil {
return err
}
glog.Infof("input ComponentSpec:%s\n", prettyPrint(*componentSpecJson))
componentSpec := &pipelinespec.ComponentSpec{}
if err := jsonpb.UnmarshalString(*componentSpecJson, componentSpec); err != nil {
return fmt.Errorf("failed to unmarshal component spec, error: %w\ncomponentSpec: %v", err, prettyPrint(*componentSpecJson))
}
var taskSpec *pipelinespec.PipelineTaskSpec
if *taskSpecJson != "" {
glog.Infof("input TaskSpec:%s\n", prettyPrint(*taskSpecJson))
taskSpec = &pipelinespec.PipelineTaskSpec{}
if err := jsonpb.UnmarshalString(*taskSpecJson, taskSpec); err != nil {
return fmt.Errorf("failed to unmarshal task spec, error: %w\ntask: %v", err, taskSpecJson)
}
}
glog.Infof("input ContainerSpec:%s\n", prettyPrint(*containerSpecJson))
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{}
if err := jsonpb.UnmarshalString(*containerSpecJson, containerSpec); err != nil {
return fmt.Errorf("failed to unmarshal container spec, error: %w\ncontainerSpec: %v", err, containerSpecJson)
}
var runtimeConfig *pipelinespec.PipelineJob_RuntimeConfig
if *runtimeConfigJson != "" {
glog.Infof("input RuntimeConfig:%s\n", prettyPrint(*runtimeConfigJson))
runtimeConfig = &pipelinespec.PipelineJob_RuntimeConfig{}
if err := jsonpb.UnmarshalString(*runtimeConfigJson, runtimeConfig); err != nil {
return fmt.Errorf("failed to unmarshal runtime config, error: %w\nruntimeConfig: %v", err, runtimeConfigJson)
}
}
var k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
if *k8sExecConfigJson != "" {
glog.Infof("input kubernetesConfig:%s\n", prettyPrint(*k8sExecConfigJson))
k8sExecCfg = &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(*k8sExecConfigJson, k8sExecCfg); err != nil {
return fmt.Errorf("failed to unmarshal Kubernetes config, error: %w\nKubernetesConfig: %v", err, k8sExecConfigJson)
}
}
namespace, err := config.InPodNamespace()
if err != nil {
return err
}
client, err := newMlmdClient()
if err != nil {
return err
}
cacheClient, err := cacheutils.NewClient()
if err != nil {
return err
}
options := driver.Options{
PipelineName: *pipelineName,
RunID: *runID,
Namespace: namespace,
Component: componentSpec,
Task: taskSpec,
DAGExecutionID: *dagExecutionID,
IterationIndex: *iterationIndex,
}
var execution *driver.Execution
var driverErr error
switch *driverType {
case "ROOT_DAG":
options.RuntimeConfig = runtimeConfig
execution, driverErr = driver.RootDAG(ctx, options, client)
case "DAG":
execution, driverErr = driver.DAG(ctx, options, client)
case "CONTAINER":
options.Container = containerSpec
options.KubernetesExecutorConfig = k8sExecCfg
execution, driverErr = driver.Container(ctx, options, client, cacheClient)
default:
err = fmt.Errorf("unknown driverType %s", *driverType)
}
if driverErr != nil {
if execution == nil {
return driverErr
}
defer func() {
// Override error with driver error, because driver error is more important.
// However, we continue running, because the following code prints debug info that
// may be helpful for figuring out why this failed.
err = driverErr
}()
}
if execution.ID != 0 {
glog.Infof("output execution.ID=%v", execution.ID)
if *executionIDPath != "" {
if err = writeFile(*executionIDPath, []byte(fmt.Sprint(execution.ID))); err != nil {
return fmt.Errorf("failed to write execution ID to file: %w", err)
}
}
}
if execution.IterationCount != nil {
if err = writeFile(*iterationCountPath, []byte(fmt.Sprintf("%v", *execution.IterationCount))); err != nil {
return fmt.Errorf("failed to write iteration count to file: %w", err)
}
}
if execution.Cached != nil {
if err = writeFile(*cachedDecisionPath, []byte(strconv.FormatBool(*execution.Cached))); err != nil {
return fmt.Errorf("failed to write cached decision to file: %w", err)
}
}
if execution.Condition != nil {
if err = writeFile(*conditionPath, []byte(strconv.FormatBool(*execution.Condition))); err != nil {
return fmt.Errorf("failed to write condition to file: %w", err)
}
}
if execution.PodSpecPatch != "" {
glog.Infof("output podSpecPatch=\n%s\n", execution.PodSpecPatch)
if *podSpecPatchPath == "" {
return fmt.Errorf("--pod_spec_patch_path is required for container executor drivers")
}
if err = writeFile(*podSpecPatchPath, []byte(execution.PodSpecPatch)); err != nil {
return fmt.Errorf("failed to write pod spec patch to file: %w", err)
}
}
if execution.ExecutorInput != nil {
marshaler := jsonpb.Marshaler{}
executorInputJSON, err := marshaler.MarshalToString(execution.ExecutorInput)
if err != nil {
return fmt.Errorf("failed to marshal ExecutorInput to JSON: %w", err)
}
glog.Infof("output ExecutorInput:%s\n", prettyPrint(executorInputJSON))
}
return nil
}
func prettyPrint(jsonStr string) string {
var prettyJSON bytes.Buffer
err := json.Indent(&prettyJSON, []byte(jsonStr), "", " ")
if err != nil {
return jsonStr
}
return prettyJSON.String()
}
func writeFile(path string, data []byte) (err error) {
if path == "" {
return fmt.Errorf("path is not specified")
}
defer func() {
if err != nil {
err = fmt.Errorf("failed to write to %s: %w", path, err)
}
}()
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return err
}
return ioutil.WriteFile(path, data, 0o644)
}
func newMlmdClient() (*metadata.Client, error) {
mlmdConfig := metadata.DefaultConfig()
if *mlmdServerAddress != "" && *mlmdServerPort != "" {
mlmdConfig.Address = *mlmdServerAddress
mlmdConfig.Port = *mlmdServerPort
}
return metadata.NewClient(mlmdConfig.Address, mlmdConfig.Port)
}

View File

@ -1,123 +0,0 @@
module github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-driver
require (
github.com/golang/glog v1.1.0
github.com/golang/protobuf v1.5.3
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03
github.com/tektoncd/pipeline v0.50.2
k8s.io/api v0.27.1
k8s.io/client-go v0.27.2
knative.dev/pkg v0.0.0-20231011201526-df28feae6d34
)
require (
cloud.google.com/go v0.110.2 // indirect
cloud.google.com/go/compute v1.19.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.0 // indirect
cloud.google.com/go/storage v1.29.0 // indirect
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20220418222510-f25a4f6275ed // indirect
github.com/aws/aws-sdk-go v1.42.50 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/cel-go v0.12.5 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-containerregistry v0.15.2 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/wire v0.4.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20230810215105-e1f0c010f800 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.13.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/automaxprocs v1.4.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.24.0 // indirect
gocloud.dev v0.22.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.9.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/api v0.128.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/grpc v1.56.3 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.25.4 // indirect
k8s.io/apimachinery v0.27.2 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect
k8s.io/utils v0.0.0-20230505201702-9f6742963106 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
replace (
k8s.io/api => k8s.io/api v0.25.9
k8s.io/apimachinery => k8s.io/apimachinery v0.26.5
k8s.io/client-go => k8s.io/client-go v0.25.9
k8s.io/code-generator => k8s.io/code-generator v0.25.9
k8s.io/kubernetes => k8s.io/kubernetes v1.11.1
sigs.k8s.io/controller-tools => sigs.k8s.io/controller-tools v0.2.9
)
go 1.19

File diff suppressed because it is too large Load Diff

View File

@ -1,43 +0,0 @@
package controller
import (
context "context"
runInformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/customrun"
customrun "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
tkncontroller "github.com/tektoncd/pipeline/pkg/controller"
"k8s.io/client-go/tools/cache"
configmap "knative.dev/pkg/configmap"
controller "knative.dev/pkg/controller"
logging "knative.dev/pkg/logging"
)
const (
ControllerName = "kfp-driver"
apiVersion = "kfp-driver.tekton.dev/v1alpha1"
kind = "KFPDriver"
)
// NewController creates a Reconciler for Run and returns the result of NewImpl.
func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
runInformer := runInformer.Get(ctx)
r := &Reconciler{}
impl := customrun.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
AgentName: ControllerName,
}
})
logger.Info("Setting up event handlers")
runInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: tkncontroller.FilterCustomRunRef(apiVersion, kind),
Handler: controller.HandleAll(impl.Enqueue),
})
return impl
}

View File

@ -1,41 +0,0 @@
--- klog.go 2023-02-14 13:31:28.209488578 -0800
+++ vendor/k8s.io/klog/v2/klog.go 2023-02-14 13:33:28.081570621 -0800
@@ -401,25 +401,25 @@
// init sets up the defaults and creates command line flags.
func init() {
- commandLine.StringVar(&logging.logDir, "log_dir", "", "If non-empty, write log files in this directory (no effect when -logtostderr=true)")
- commandLine.StringVar(&logging.logFile, "log_file", "", "If non-empty, use this log file (no effect when -logtostderr=true)")
- commandLine.Uint64Var(&logging.logFileMaxSizeMB, "log_file_max_size", 1800,
+ commandLine.StringVar(&logging.logDir, "klog_dir", "", "If non-empty, write log files in this directory (no effect when -logtostderr=true)")
+ commandLine.StringVar(&logging.logFile, "klog_file", "", "If non-empty, use this log file (no effect when -logtostderr=true)")
+ commandLine.Uint64Var(&logging.logFileMaxSizeMB, "klog_file_max_size", 1800,
"Defines the maximum size a log file can grow to (no effect when -logtostderr=true). Unit is megabytes. "+
"If the value is 0, the maximum file size is unlimited.")
- commandLine.BoolVar(&logging.toStderr, "logtostderr", true, "log to standard error instead of files")
- commandLine.BoolVar(&logging.alsoToStderr, "alsologtostderr", false, "log to standard error as well as files (no effect when -logtostderr=true)")
+ commandLine.BoolVar(&logging.toStderr, "klog_tostderr", true, "log to standard error instead of files")
+ commandLine.BoolVar(&logging.alsoToStderr, "klog_alsologtostderr", false, "log to standard error as well as files (no effect when -logtostderr=true)")
logging.setVState(0, nil, false)
- commandLine.Var(&logging.verbosity, "v", "number for the log level verbosity")
- commandLine.BoolVar(&logging.addDirHeader, "add_dir_header", false, "If true, adds the file directory to the header of the log messages")
- commandLine.BoolVar(&logging.skipHeaders, "skip_headers", false, "If true, avoid header prefixes in the log messages")
- commandLine.BoolVar(&logging.oneOutput, "one_output", false, "If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)")
- commandLine.BoolVar(&logging.skipLogHeaders, "skip_log_headers", false, "If true, avoid headers when opening log files (no effect when -logtostderr=true)")
+ commandLine.Var(&logging.verbosity, "klog_v", "number for the log level verbosity")
+ commandLine.BoolVar(&logging.addDirHeader, "klog_add_dir_header", false, "If true, adds the file directory to the header of the log messages")
+ commandLine.BoolVar(&logging.skipHeaders, "klog_skip_headers", false, "If true, avoid header prefixes in the log messages")
+ commandLine.BoolVar(&logging.oneOutput, "klog_one_output", false, "If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)")
+ commandLine.BoolVar(&logging.skipLogHeaders, "klog_skip_log_headers", false, "If true, avoid headers when opening log files (no effect when -logtostderr=true)")
logging.stderrThreshold = severityValue{
Severity: severity.ErrorLog, // Default stderrThreshold is ERROR.
}
- commandLine.Var(&logging.stderrThreshold, "stderrthreshold", "logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=false)")
- commandLine.Var(&logging.vmodule, "vmodule", "comma-separated list of pattern=N settings for file-filtered logging")
- commandLine.Var(&logging.traceLocation, "log_backtrace_at", "when logging hits line file:N, emit a stack trace")
+ commandLine.Var(&logging.stderrThreshold, "klog_stderrthreshold", "logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=false)")
+ commandLine.Var(&logging.vmodule, "klog_vmodule", "comma-separated list of pattern=N settings for file-filtered logging")
+ commandLine.Var(&logging.traceLocation, "klog_backtrace_at", "when logging hits line file:N, emit a stack trace")
logging.settings.contextualLoggingEnabled = true
logging.flushD = newFlushDaemon(logging.lockAndFlushAll, nil)

View File

@ -1,323 +0,0 @@
package controller
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strconv"
"github.com/golang/protobuf/jsonpb"
v1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
v1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
reconciler "knative.dev/pkg/reconciler"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
"github.com/kubeflow/pipelines/backend/src/v2/driver"
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
)
const (
// ReasonFailedValidation indicates that the reason for failure status is that Run failed runtime validation
ReasonFailedValidation = "RunValidationFailed"
// ReasonDriverError indicates that an error is throw while running the driver
ReasonDriverError = "DriverError"
// ReasonDriverSuccess indicates that driver finished successfully
ReasonDriverSuccess = "DriverSuccess"
ExecutionID = "execution-id"
ExecutorInput = "executor-input"
CacheDecision = "cached-decision"
Condition = "condition"
IterationCount = "iteration-count"
PodSpecPatch = "pod-spec-patch"
)
// newReconciledNormal makes a new reconciler event with event type Normal, and reason RunReconciled.
func newReconciledNormal(namespace, name string) reconciler.Event {
return reconciler.NewEvent(v1.EventTypeNormal, "RunReconciled", "Run reconciled: \"%s/%s\"", namespace, name)
}
// Reconciler implements controller.Reconciler for Run resources.
type Reconciler struct {
}
type driverOptions struct {
driverType string
options driver.Options
mlmdClient *metadata.Client
cacheClient *cacheutils.Client
}
// Check that our Reconciler implements Interface
var _ customrun.Interface = (*Reconciler)(nil)
// ReconcileKind implements Interface.ReconcileKind.
func (r *Reconciler) ReconcileKind(ctx context.Context, run *v1beta1.CustomRun) reconciler.Event {
logger := logging.FromContext(ctx)
logger.Infof("Reconciling Run %s/%s", run.Namespace, run.Name)
// If the Run has not started, initialize the Condition and set the start time.
if !run.HasStarted() {
logger.Infof("Starting new Run %s/%s", run.Namespace, run.Name)
run.Status.InitializeConditions()
// In case node time was not synchronized, when controller has been scheduled to other nodes.
if run.Status.StartTime.Sub(run.CreationTimestamp.Time) < 0 {
logger.Warnf("Run %s/%s createTimestamp %s is after the Run started %s", run.Namespace, run.Name, run.CreationTimestamp, run.Status.StartTime)
run.Status.StartTime = &run.CreationTimestamp
}
}
if run.IsDone() {
logger.Infof("Run %s/%s is done", run.Namespace, run.Name)
return nil
}
options, err := parseParams(run)
if err != nil {
logger.Errorf("Run %s/%s is invalid because of %s", run.Namespace, run.Name, err)
run.Status.MarkCustomRunFailed(ReasonFailedValidation,
"Run can't be run because it has an invalid param - %v", err)
return nil
}
runResults, driverErr := execDriver(ctx, options)
if driverErr != nil {
logger.Errorf("kfp-driver execution failed when reconciling Run %s/%s: %v", run.Namespace, run.Name, driverErr)
run.Status.MarkCustomRunFailed(ReasonDriverError,
"kfp-driver execution failed: %v", driverErr)
return nil
}
run.Status.Results = append(run.Status.Results, *runResults...)
run.Status.MarkCustomRunSucceeded(ReasonDriverSuccess, "kfp-driver finished successfully")
return newReconciledNormal(run.Namespace, run.Name)
}
func execDriver(ctx context.Context, options *driverOptions) (*[]v1beta1.CustomRunResult, error) {
var execution *driver.Execution
var err error
logger := logging.FromContext(ctx)
switch options.driverType {
case "ROOT_DAG":
execution, err = driver.RootDAG(ctx, options.options, options.mlmdClient)
case "CONTAINER":
execution, err = driver.Container(ctx, options.options, options.mlmdClient, options.cacheClient)
case "DAG":
execution, err = driver.DAG(ctx, options.options, options.mlmdClient)
case "DAG-PUB":
// no-op for now
return &[]v1beta1.CustomRunResult{}, nil
default:
err = fmt.Errorf("unknown driverType %s", options.driverType)
}
if err != nil {
return nil, err
}
var runResults []v1beta1.CustomRunResult
if execution.ID != 0 {
logger.Infof("output execution.ID=%v", execution.ID)
runResults = append(runResults, v1beta1.CustomRunResult{
Name: ExecutionID,
Value: fmt.Sprint(execution.ID),
})
}
if execution.IterationCount != nil {
count := *execution.IterationCount
// the count would be use as 'to' in PipelineLoop. since PipelineLoop's numberic iteration includes to,
// need to substract 1 to compensate that.
count = count - 1
if count < 0 {
count = 0
}
logger.Infof("output execution.IterationCount=%v, count:=%v", *execution.IterationCount, count)
runResults = append(runResults, v1beta1.CustomRunResult{
Name: IterationCount,
Value: fmt.Sprint(count),
})
}
logger.Infof("output execution.Condition=%v", execution.Condition)
if execution.Condition == nil {
runResults = append(runResults, v1beta1.CustomRunResult{
Name: Condition,
Value: "",
})
} else {
runResults = append(runResults, v1beta1.CustomRunResult{
Name: Condition,
Value: strconv.FormatBool(*execution.Condition),
})
}
if execution.ExecutorInput != nil {
marshaler := jsonpb.Marshaler{}
executorInputJSON, err := marshaler.MarshalToString(execution.ExecutorInput)
if err != nil {
return nil, fmt.Errorf("failed to marshal ExecutorInput to JSON: %w", err)
}
logger.Infof("output ExecutorInput:%s\n", prettyPrint(executorInputJSON))
runResults = append(runResults, v1beta1.CustomRunResult{
Name: ExecutorInput,
Value: fmt.Sprint(executorInputJSON),
})
}
// seems no need to handle the PodSpecPatch
if execution.Cached != nil {
runResults = append(runResults, v1beta1.CustomRunResult{
Name: CacheDecision,
Value: strconv.FormatBool(*execution.Cached),
})
}
if options.driverType == "CONTAINER" {
runResults = append(runResults, v1beta1.CustomRunResult{
Name: PodSpecPatch,
Value: execution.PodSpecPatch,
})
}
return &runResults, nil
}
func prettyPrint(jsonStr string) string {
var prettyJSON bytes.Buffer
err := json.Indent(&prettyJSON, []byte(jsonStr), "", " ")
if err != nil {
return jsonStr
}
return prettyJSON.String()
}
func parseParams(run *v1beta1.CustomRun) (*driverOptions, *apis.FieldError) {
if len(run.Spec.Params) == 0 {
return nil, apis.ErrMissingField("params")
}
opts := &driverOptions{
driverType: "",
}
opts.options.Namespace = run.Namespace
mlmdOpts := metadata.ServerConfig{
Address: "metadata-grpc-service.kubeflow.svc.cluster.local",
Port: "8080",
}
for _, param := range run.Spec.Params {
switch param.Name {
case "type":
opts.driverType = param.Value.StringVal
case "pipeline_name":
opts.options.PipelineName = param.Value.StringVal
case "run_id":
opts.options.RunID = param.Value.StringVal
case "component":
if param.Value.StringVal != "" {
componentSpec := &pipelinespec.ComponentSpec{}
if err := jsonpb.UnmarshalString(param.Value.StringVal, componentSpec); err != nil {
return nil, apis.ErrInvalidValue(
fmt.Sprintf("failed to unmarshal component spec, error: %v\ncomponentSpec: %v", err, param.Value.StringVal),
"component",
)
}
opts.options.Component = componentSpec
}
case "task":
if param.Value.StringVal != "" {
taskSpec := &pipelinespec.PipelineTaskSpec{}
if err := jsonpb.UnmarshalString(param.Value.StringVal, taskSpec); err != nil {
return nil, apis.ErrInvalidValue(
fmt.Sprintf("failed to unmarshal task spec, error: %v\ntask: %v", err, param.Value.StringVal),
"task",
)
}
opts.options.Task = taskSpec
}
case "runtime_config":
if param.Value.StringVal != "" {
runtimeConfig := &pipelinespec.PipelineJob_RuntimeConfig{}
if err := jsonpb.UnmarshalString(param.Value.StringVal, runtimeConfig); err != nil {
return nil, apis.ErrInvalidValue(
fmt.Sprintf("failed to unmarshal runtime config, error: %v\nruntimeConfig: %v", err, param.Value.StringVal),
"runtime-config",
)
}
opts.options.RuntimeConfig = runtimeConfig
}
case "container":
if param.Value.StringVal != "" {
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{}
if err := jsonpb.UnmarshalString(param.Value.StringVal, containerSpec); err != nil {
return nil, apis.ErrInvalidValue(
fmt.Sprintf("failed to unmarshal container spec, error: %v\ncontainerSpec: %v", err, param.Value.StringVal),
"container",
)
}
opts.options.Container = containerSpec
}
case "iteration_index":
if param.Value.StringVal != "" {
tmp, _ := strconv.ParseInt(param.Value.StringVal, 10, 32)
opts.options.IterationIndex = int(tmp)
} else {
opts.options.IterationIndex = -1
}
case "dag_execution_id":
if param.Value.StringVal != "" {
opts.options.DAGExecutionID, _ = strconv.ParseInt(param.Value.StringVal, 10, 64)
}
case "mlmd_server_address":
mlmdOpts.Address = param.Value.StringVal
case "mlmd_server_port":
mlmdOpts.Port = param.Value.StringVal
case "kubernetes_config":
var k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
if param.Value.StringVal != "" {
k8sExecCfg = &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(param.Value.StringVal, k8sExecCfg); err != nil {
return nil, apis.ErrInvalidValue(
fmt.Sprintf("failed to unmarshal Kubernetes config, error: %v\nKubernetesConfig: %v", err, param.Value.StringVal),
"kubernetes_config",
)
}
opts.options.KubernetesExecutorConfig = k8sExecCfg
}
}
}
//Check all options
if opts.driverType == "" {
return nil, apis.ErrMissingField("type")
}
if opts.options.RunID == "" {
return nil, apis.ErrMissingField("run-id")
}
if opts.driverType == "ROOT_DAG" && opts.options.RuntimeConfig == nil {
return nil, apis.ErrMissingField("runtime-config")
}
client, err := metadata.NewClient(mlmdOpts.Address, mlmdOpts.Port)
if err != nil {
return nil, apis.ErrGeneric(fmt.Sprintf("can't estibalish MLMD connection: %v", err))
}
opts.mlmdClient = client
cacheClient, err := cacheutils.NewClient()
if err != nil {
return nil, apis.ErrGeneric(fmt.Sprintf("can't estibalish cache connection: %v", err))
}
opts.cacheClient = cacheClient
return opts, nil
}

View File

@ -1 +1 @@
2.0.3
2.1.0

View File

@ -1 +1 @@
2.0.3
2.1.0

View File

@ -3,15 +3,15 @@ module github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask
require (
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.3.1
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240111221413-aac4408237df
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines v0.0.0-20240416215826-da804407ad31
github.com/kubeflow/pipelines/api v0.0.0-20240416215826-da804407ad31
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240416215826-da804407ad31
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20240416215826-da804407ad31
github.com/stretchr/testify v1.8.4
github.com/tektoncd/pipeline v0.53.2
go.uber.org/zap v1.26.0
google.golang.org/protobuf v1.31.0
k8s.io/api v0.27.1
k8s.io/api v0.27.2
k8s.io/apimachinery v0.27.3
k8s.io/client-go v0.27.2
k8s.io/utils v0.0.0-20230505201702-9f6742963106
@ -27,7 +27,7 @@ require (
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
github.com/aws/aws-sdk-go v1.42.50 // indirect
github.com/aws/aws-sdk-go v1.45.25 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
@ -37,7 +37,7 @@ require (
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
@ -45,7 +45,7 @@ require (
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gobuffalo/flect v0.2.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.1.0 // indirect
github.com/golang/glog v1.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/cel-go v0.12.6 // indirect
github.com/google/gnostic v0.6.9 // indirect
@ -75,11 +75,12 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
go.opencensus.io v0.24.0 // indirect
@ -89,7 +90,7 @@ require (
gocloud.dev v0.22.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.15.0 // indirect
@ -107,7 +108,7 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.26.5 // indirect
k8s.io/apiextensions-apiserver v0.27.2 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230515203736-54b630e78af5 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect

File diff suppressed because it is too large Load Diff