This commit is contained in:
parent
d68231dd9c
commit
4810b7aac7
|
|
@ -15,8 +15,11 @@
|
|||
package template
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
|
@ -28,6 +31,7 @@ import (
|
|||
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
structpb "google.golang.org/protobuf/types/known/structpb"
|
||||
goyaml "gopkg.in/yaml.v3"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"sigs.k8s.io/yaml"
|
||||
)
|
||||
|
|
@ -58,13 +62,34 @@ func inferTemplateFormat(template []byte) TemplateType {
|
|||
return Unknown
|
||||
case isArgoWorkflow(template):
|
||||
return V1
|
||||
case isPipelineSpec(template):
|
||||
case isV2Spec(template):
|
||||
return V2
|
||||
default:
|
||||
return Unknown
|
||||
}
|
||||
}
|
||||
|
||||
// isV2Spec returns whether template contains api/v2alpha1/PipelineSpec format.
|
||||
func isV2Spec(template []byte) bool {
|
||||
decoder := goyaml.NewDecoder(bytes.NewReader(template))
|
||||
for {
|
||||
var value map[string]interface{}
|
||||
|
||||
err := decoder.Decode(&value)
|
||||
// Break at end of file
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
if isPipelineSpec(value) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// isArgoWorkflow returns whether template is in argo workflow spec format.
|
||||
func isArgoWorkflow(template []byte) bool {
|
||||
var meta metav1.TypeMeta
|
||||
|
|
@ -76,13 +101,13 @@ func isArgoWorkflow(template []byte) bool {
|
|||
}
|
||||
|
||||
// isPipelineSpec returns whether template is in KFP api/v2alpha1/PipelineSpec format.
|
||||
func isPipelineSpec(template []byte) bool {
|
||||
var spec pipelinespec.PipelineSpec
|
||||
templateJson, err := yaml.YAMLToJSON(template)
|
||||
func isPipelineSpec(value map[string]interface{}) bool {
|
||||
jsonData, err := json.Marshal(value)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
err = protojson.Unmarshal(templateJson, &spec)
|
||||
var spec pipelinespec.PipelineSpec
|
||||
err = protojson.Unmarshal(jsonData, &spec)
|
||||
return err == nil && spec.GetPipelineInfo().GetName() != "" && spec.GetRoot() != nil
|
||||
}
|
||||
|
||||
|
|
@ -111,11 +136,6 @@ type RunWorkflowOptions struct {
|
|||
RunAt int64
|
||||
}
|
||||
|
||||
func NewFromString(s string) (Template, error) {
|
||||
bytes := []byte(s)
|
||||
return New(bytes)
|
||||
}
|
||||
|
||||
func New(bytes []byte) (Template, error) {
|
||||
format := inferTemplateFormat(bytes)
|
||||
switch format {
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
package template
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"testing"
|
||||
|
|
@ -29,6 +30,7 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
goyaml "gopkg.in/yaml.v3"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"sigs.k8s.io/yaml"
|
||||
)
|
||||
|
|
@ -105,7 +107,7 @@ kind: CronWorkflow`,
|
|||
for _, test := range tt {
|
||||
format := inferTemplateFormat([]byte(test.template))
|
||||
if format != test.templateType {
|
||||
t.Errorf("InferSpecFormat(%s)=%q, expect %q", test.template, format, test.templateType)
|
||||
t.Errorf("InferTemplateFormat(%s)=%q, expect %q", test.template, format, test.templateType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -190,7 +192,7 @@ func TestScheduledWorkflow(t *testing.T) {
|
|||
PipelineName: "pipeline name",
|
||||
PipelineSpecManifest: v2SpecHelloWorldYAML,
|
||||
RuntimeConfig: model.RuntimeConfig{
|
||||
Parameters: "{\"text\":\"world\"}",
|
||||
Parameters: "{\"y\":\"world\"}",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -212,7 +214,7 @@ func TestScheduledWorkflow(t *testing.T) {
|
|||
},
|
||||
},
|
||||
Workflow: &scheduledworkflow.WorkflowResource{
|
||||
Parameters: []scheduledworkflow.Parameter{{Name: "text", Value: "\"world\""}},
|
||||
Parameters: []scheduledworkflow.Parameter{{Name: "y", Value: "\"world\""}},
|
||||
Spec: "",
|
||||
},
|
||||
NoCatchup: util.BoolPointer(true),
|
||||
|
|
@ -267,9 +269,14 @@ func TestIsPlatformSpecWithKubernetesConfig(t *testing.T) {
|
|||
|
||||
func TestNewTemplate_V2(t *testing.T) {
|
||||
template := loadYaml(t, "testdata/hello_world.yaml")
|
||||
expectedSpecJson, _ := yaml.YAMLToJSON([]byte(template))
|
||||
var yamlData map[string]interface{}
|
||||
err := goyaml.Unmarshal([]byte(template), &yamlData)
|
||||
assert.Nil(t, err)
|
||||
jsonData, err := json.Marshal(yamlData)
|
||||
assert.Nil(t, err)
|
||||
var expectedSpec pipelinespec.PipelineSpec
|
||||
protojson.Unmarshal(expectedSpecJson, &expectedSpec)
|
||||
err = protojson.Unmarshal(jsonData, &expectedSpec)
|
||||
assert.Nil(t, err)
|
||||
expectedTemplate := &V2Spec{
|
||||
spec: &expectedSpec,
|
||||
}
|
||||
|
|
@ -284,11 +291,19 @@ func TestNewTemplate_WithPlatformSpec(t *testing.T) {
|
|||
var expectedPlatformSpec pipelinespec.PlatformSpec
|
||||
|
||||
splitTemplate := strings.Split(template, "\n---\n")
|
||||
expectedSpecJson, _ := yaml.YAMLToJSON([]byte(splitTemplate[0]))
|
||||
protojson.Unmarshal(expectedSpecJson, &expectedPipelineSpec)
|
||||
var pipelineSpecData map[string]interface{}
|
||||
err := goyaml.Unmarshal([]byte(splitTemplate[0]), &pipelineSpecData)
|
||||
assert.Nil(t, err)
|
||||
jsonData, err := json.Marshal(pipelineSpecData)
|
||||
assert.Nil(t, err)
|
||||
protojson.Unmarshal(jsonData, &expectedPipelineSpec)
|
||||
|
||||
expectedPlatformSpecJson, _ := yaml.YAMLToJSON([]byte(splitTemplate[1]))
|
||||
protojson.Unmarshal(expectedPlatformSpecJson, &expectedPlatformSpec)
|
||||
var platformSpecData map[string]interface{}
|
||||
err = goyaml.Unmarshal([]byte(splitTemplate[1]), &platformSpecData)
|
||||
assert.Nil(t, err)
|
||||
jsonData, err = json.Marshal(platformSpecData)
|
||||
assert.Nil(t, err)
|
||||
protojson.Unmarshal(jsonData, &expectedPlatformSpec)
|
||||
|
||||
expectedTemplate := &V2Spec{
|
||||
spec: &expectedPipelineSpec,
|
||||
|
|
@ -301,11 +316,7 @@ func TestNewTemplate_WithPlatformSpec(t *testing.T) {
|
|||
|
||||
func TestNewTemplate_V2_InvalidSchemaVersion(t *testing.T) {
|
||||
template := loadYaml(t, "testdata/hello_world_schema_2_0_0.yaml")
|
||||
expectedSpecJson, _ := yaml.YAMLToJSON([]byte(template))
|
||||
var expectedSpec pipelinespec.PipelineSpec
|
||||
err := protojson.Unmarshal(expectedSpecJson, &expectedSpec)
|
||||
assert.Nil(t, err)
|
||||
_, err = New([]byte(template))
|
||||
_, err := New([]byte(template))
|
||||
assert.NotNil(t, err)
|
||||
assert.Contains(t, err.Error(), "KFP only supports schema version 2.1.0")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,12 +45,12 @@ root:
|
|||
inputs:
|
||||
parameters:
|
||||
text:
|
||||
componentInputParameter: text
|
||||
componentInputParameter: y
|
||||
taskInfo:
|
||||
name: hello-world
|
||||
inputDefinitions:
|
||||
parameters:
|
||||
text:
|
||||
y:
|
||||
parameterType: STRING
|
||||
schemaVersion: 2.1.0
|
||||
sdkVersion: kfp-1.6.5
|
||||
sdkVersion: kfp-1.6.5
|
||||
|
|
|
|||
|
|
@ -16,7 +16,10 @@ package template
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
|
|
@ -27,7 +30,7 @@ import (
|
|||
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
|
||||
"github.com/kubeflow/pipelines/backend/src/v2/compiler/argocompiler"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
goyaml "gopkg.in/yaml.v2"
|
||||
goyaml "gopkg.in/yaml.v3"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"sigs.k8s.io/yaml"
|
||||
)
|
||||
|
|
@ -125,29 +128,37 @@ func (t *V2Spec) GetTemplateType() TemplateType {
|
|||
}
|
||||
|
||||
func NewV2SpecTemplate(template []byte) (*V2Spec, error) {
|
||||
var spec pipelinespec.PipelineSpec
|
||||
var v2Spec V2Spec
|
||||
decoder := goyaml.NewDecoder(bytes.NewReader(template))
|
||||
for {
|
||||
var value map[string]interface{}
|
||||
|
||||
err := decoder.Decode(&value)
|
||||
// Break at end of file
|
||||
if decoder.Decode(&value) != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
valueBytes, err := goyaml.Marshal(value)
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, util.NewInvalidInputErrorWithDetails(ErrorInvalidPipelineSpec, fmt.Sprintf("unable to decode yaml document: %s", err.Error()))
|
||||
}
|
||||
valueBytes, err := goyaml.Marshal(&value)
|
||||
if err != nil {
|
||||
return nil, util.NewInvalidInputErrorWithDetails(ErrorInvalidPipelineSpec, fmt.Sprintf("unable to marshal this yaml document: %s", err.Error()))
|
||||
}
|
||||
if isPipelineSpec(valueBytes) {
|
||||
if isPipelineSpec(value) {
|
||||
// Pick out the yaml document with pipeline spec
|
||||
if v2Spec.spec != nil {
|
||||
return nil, util.NewInvalidInputErrorWithDetails(ErrorInvalidPipelineSpec, "multiple pipeline specs provided")
|
||||
}
|
||||
pipelineSpecJson, err := yaml.YAMLToJSON(valueBytes)
|
||||
jsonData, err := json.Marshal(value)
|
||||
if err != nil {
|
||||
return nil, util.NewInvalidInputErrorWithDetails(ErrorInvalidPipelineSpec, fmt.Sprintf("cannot convert v2 pipeline spec to json format: %s", err.Error()))
|
||||
}
|
||||
err = protojson.Unmarshal(pipelineSpecJson, &spec)
|
||||
var spec pipelinespec.PipelineSpec
|
||||
err = protojson.Unmarshal(jsonData, &spec)
|
||||
if err != nil {
|
||||
return nil, util.NewInvalidInputErrorWithDetails(ErrorInvalidPipelineSpec, fmt.Sprintf("invalid v2 pipeline spec: %s", err.Error()))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ import (
|
|||
"google.golang.org/protobuf/encoding/protojson"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/structpb"
|
||||
"gopkg.in/yaml.v2"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ gopkg.in/jcmturner/dnsutils.v1,https://github.com/jcmturner/dnsutils/blob/v1.0.1
|
|||
gopkg.in/jcmturner/gokrb5.v5,https://github.com/jcmturner/gokrb5/blob/v5.3.0/LICENSE,Apache-2.0
|
||||
gopkg.in/jcmturner/rpc.v0/ndr,https://github.com/jcmturner/rpc/blob/v0.0.2/LICENSE,Apache-2.0
|
||||
gopkg.in/yaml.v2,https://github.com/go-yaml/yaml/blob/v2.4.0/LICENSE,Apache-2.0
|
||||
gopkg.in/yaml.v3,https://github.com/go-yaml/yaml/blob/496545a6307b/LICENSE,MIT
|
||||
gopkg.in/yaml.v3,https://github.com/go-yaml/yaml/blob/v3.0.1/LICENSE,MIT
|
||||
k8s.io/api,https://github.com/kubernetes/api/blob/v0.24.3/LICENSE,Apache-2.0
|
||||
k8s.io/apimachinery/pkg,https://github.com/kubernetes/apimachinery/blob/v0.24.3/LICENSE,Apache-2.0
|
||||
k8s.io/apimachinery/third_party/forked/golang,https://github.com/kubernetes/apimachinery/blob/v0.24.3/third_party/forked/golang/LICENSE,BSD-3-Clause
|
||||
|
|
|
|||
|
|
|
@ -87,7 +87,7 @@ gopkg.in/jcmturner/dnsutils.v1,https://github.com/jcmturner/dnsutils/blob/v1.0.1
|
|||
gopkg.in/jcmturner/gokrb5.v5,https://github.com/jcmturner/gokrb5/blob/v5.3.0/LICENSE,Apache-2.0
|
||||
gopkg.in/jcmturner/rpc.v0/ndr,https://github.com/jcmturner/rpc/blob/v0.0.2/LICENSE,Apache-2.0
|
||||
gopkg.in/yaml.v2,https://github.com/go-yaml/yaml/blob/v2.4.0/LICENSE,Apache-2.0
|
||||
gopkg.in/yaml.v3,https://github.com/go-yaml/yaml/blob/496545a6307b/LICENSE,MIT
|
||||
gopkg.in/yaml.v3,https://github.com/go-yaml/yaml/blob/v3.0.1/LICENSE,MIT
|
||||
k8s.io/api,https://github.com/kubernetes/api/blob/v0.24.3/LICENSE,Apache-2.0
|
||||
k8s.io/apimachinery/pkg,https://github.com/kubernetes/apimachinery/blob/v0.24.3/LICENSE,Apache-2.0
|
||||
k8s.io/apimachinery/third_party/forked/golang,https://github.com/kubernetes/apimachinery/blob/v0.24.3/third_party/forked/golang/LICENSE,BSD-3-Clause
|
||||
|
|
|
|||
|
|
|
@ -92,7 +92,7 @@ gopkg.in/jcmturner/dnsutils.v1,https://github.com/jcmturner/dnsutils/blob/v1.0.1
|
|||
gopkg.in/jcmturner/gokrb5.v5,https://github.com/jcmturner/gokrb5/blob/v5.3.0/LICENSE,Apache-2.0
|
||||
gopkg.in/jcmturner/rpc.v0/ndr,https://github.com/jcmturner/rpc/blob/v0.0.2/LICENSE,Apache-2.0
|
||||
gopkg.in/yaml.v2,https://github.com/go-yaml/yaml/blob/v2.4.0/LICENSE,Apache-2.0
|
||||
gopkg.in/yaml.v3,https://github.com/go-yaml/yaml/blob/496545a6307b/LICENSE,MIT
|
||||
gopkg.in/yaml.v3,https://github.com/go-yaml/yaml/blob/v3.0.1/LICENSE,MIT
|
||||
k8s.io/api,https://github.com/kubernetes/api/blob/v0.24.3/LICENSE,Apache-2.0
|
||||
k8s.io/apimachinery/pkg,https://github.com/kubernetes/apimachinery/blob/v0.24.3/LICENSE,Apache-2.0
|
||||
k8s.io/apimachinery/third_party/forked/golang,https://github.com/kubernetes/apimachinery/blob/v0.24.3/third_party/forked/golang/LICENSE,BSD-3-Clause
|
||||
|
|
|
|||
|
|
|
@ -94,7 +94,7 @@ gopkg.in/jcmturner/dnsutils.v1,https://github.com/jcmturner/dnsutils/blob/v1.0.1
|
|||
gopkg.in/jcmturner/gokrb5.v5,https://github.com/jcmturner/gokrb5/blob/v5.3.0/LICENSE,Apache-2.0
|
||||
gopkg.in/jcmturner/rpc.v0/ndr,https://github.com/jcmturner/rpc/blob/v0.0.2/LICENSE,Apache-2.0
|
||||
gopkg.in/yaml.v2,https://github.com/go-yaml/yaml/blob/v2.4.0/LICENSE,Apache-2.0
|
||||
gopkg.in/yaml.v3,https://github.com/go-yaml/yaml/blob/496545a6307b/LICENSE,MIT
|
||||
gopkg.in/yaml.v3,https://github.com/go-yaml/yaml/blob/v3.0.1/LICENSE,MIT
|
||||
k8s.io/api,https://github.com/kubernetes/api/blob/v0.24.3/LICENSE,Apache-2.0
|
||||
k8s.io/apimachinery/pkg,https://github.com/kubernetes/apimachinery/blob/v0.24.3/LICENSE,Apache-2.0
|
||||
k8s.io/apimachinery/third_party/forked/golang,https://github.com/kubernetes/apimachinery/blob/v0.24.3/third_party/forked/golang/LICENSE,BSD-3-Clause
|
||||
|
|
|
|||
|
|
|
@ -45,7 +45,7 @@ gomodules.xyz/jsonpatch/v2,https://github.com/gomodules/jsonpatch/blob/v2.2.0/v2
|
|||
google.golang.org/protobuf,https://github.com/protocolbuffers/protobuf-go/blob/v1.27.1/LICENSE,BSD-3-Clause
|
||||
gopkg.in/inf.v0,https://github.com/go-inf/inf/blob/v0.9.1/LICENSE,BSD-3-Clause
|
||||
gopkg.in/yaml.v2,https://github.com/go-yaml/yaml/blob/v2.4.0/LICENSE,Apache-2.0
|
||||
gopkg.in/yaml.v3,https://github.com/go-yaml/yaml/blob/496545a6307b/LICENSE,MIT
|
||||
gopkg.in/yaml.v3,https://github.com/go-yaml/yaml/blob/v3.0.1/LICENSE,MIT
|
||||
k8s.io/api,https://github.com/kubernetes/api/blob/v0.24.3/LICENSE,Apache-2.0
|
||||
k8s.io/apiextensions-apiserver/pkg/apis/apiextensions,https://github.com/kubernetes/apiextensions-apiserver/blob/v0.23.3/LICENSE,Apache-2.0
|
||||
k8s.io/apimachinery/pkg,https://github.com/kubernetes/apimachinery/blob/v0.24.3/LICENSE,Apache-2.0
|
||||
|
|
|
|||
|
2
go.mod
2
go.mod
|
|
@ -48,7 +48,7 @@ require (
|
|||
google.golang.org/grpc v1.44.0
|
||||
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0
|
||||
google.golang.org/protobuf v1.27.1
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
k8s.io/api v0.24.3
|
||||
k8s.io/apimachinery v0.24.3
|
||||
k8s.io/client-go v0.24.3
|
||||
|
|
|
|||
|
|
@ -2098,8 +2098,9 @@ gopkg.in/yaml.v3 v3.0.0-20190905181640-827449938966/go.mod h1:K4uyk7z7BCEPqu6E+C
|
|||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
|
||||
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
|
||||
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
|
||||
|
|
|
|||
Loading…
Reference in New Issue