feat(backend): Upload namespaced pipeline definitions. Part of #4197 (#8511)

* feat(backend) Fix authentication in upload requests

Fix the way KFP API server authenticates pipeline upload requests.
We leverage 'isAuthenticated()` function which requires proper
initialization of the context object to include user identity.

* feat(backend): Add namespace field in pipeline upload swagger definition

Extend swagger defintion of the pipeline upload API with a namespace
parameter in order to support uploading namespaced pipelines.

* chore(backend): Generate Go & Python clients

Autogenerate the Go and Python clients after extending the swagger
definitions of upload pipeline APIs with a namespace field.
This commit is contained in:
Ilias Katsakioris 2022-12-01 21:22:44 +02:00 committed by GitHub
parent 6cd7cbcbc4
commit 931c14a742
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 121 additions and 13 deletions

View File

@ -65,6 +65,8 @@ type UploadPipelineParams struct {
Description *string
/*Name*/
Name *string
/*Namespace*/
Namespace *string
/*Uploadfile
The pipeline to upload. Maximum size of 32MB is supported.
@ -131,6 +133,17 @@ func (o *UploadPipelineParams) SetName(name *string) {
o.Name = name
}
// WithNamespace adds the namespace to the upload pipeline params
func (o *UploadPipelineParams) WithNamespace(namespace *string) *UploadPipelineParams {
o.SetNamespace(namespace)
return o
}
// SetNamespace adds the namespace to the upload pipeline params
func (o *UploadPipelineParams) SetNamespace(namespace *string) {
o.Namespace = namespace
}
// WithUploadfile adds the uploadfile to the upload pipeline params
func (o *UploadPipelineParams) WithUploadfile(uploadfile runtime.NamedReadCloser) *UploadPipelineParams {
o.SetUploadfile(uploadfile)
@ -182,6 +195,22 @@ func (o *UploadPipelineParams) WriteToRequest(r runtime.ClientRequest, reg strfm
}
if o.Namespace != nil {
// query param namespace
var qrNamespace string
if o.Namespace != nil {
qrNamespace = *o.Namespace
}
qNamespace := qrNamespace
if qNamespace != "" {
if err := r.SetQueryParam("namespace", qNamespace); err != nil {
return err
}
}
}
// form file param uploadfile
if err := r.SetFileParam("uploadfile", o.Uploadfile); err != nil {
return err

View File

@ -65,6 +65,8 @@ type UploadPipelineVersionParams struct {
Description *string
/*Name*/
Name *string
/*Namespace*/
Namespace *string
/*Pipelineid*/
Pipelineid *string
/*Uploadfile
@ -133,6 +135,17 @@ func (o *UploadPipelineVersionParams) SetName(name *string) {
o.Name = name
}
// WithNamespace adds the namespace to the upload pipeline version params
func (o *UploadPipelineVersionParams) WithNamespace(namespace *string) *UploadPipelineVersionParams {
o.SetNamespace(namespace)
return o
}
// SetNamespace adds the namespace to the upload pipeline version params
func (o *UploadPipelineVersionParams) SetNamespace(namespace *string) {
o.Namespace = namespace
}
// WithPipelineid adds the pipelineid to the upload pipeline version params
func (o *UploadPipelineVersionParams) WithPipelineid(pipelineid *string) *UploadPipelineVersionParams {
o.SetPipelineid(pipelineid)
@ -195,6 +208,22 @@ func (o *UploadPipelineVersionParams) WriteToRequest(r runtime.ClientRequest, re
}
if o.Namespace != nil {
// query param namespace
var qrNamespace string
if o.Namespace != nil {
qrNamespace = *o.Namespace
}
qNamespace := qrNamespace
if qNamespace != "" {
if err := r.SetQueryParam("namespace", qNamespace); err != nil {
return err
}
}
}
if o.Pipelineid != nil {
// query param pipelineid

View File

@ -1406,6 +1406,12 @@
"in": "query",
"required": false,
"type": "string"
},
{
"name": "namespace",
"in": "query",
"required": false,
"type": "string"
}
],
"tags": [
@ -1461,6 +1467,12 @@
"in": "query",
"required": false,
"type": "string"
},
{
"name": "namespace",
"in": "query",
"required": false,
"type": "string"
}
],
"tags": [

View File

@ -51,6 +51,12 @@
"in": "query",
"required": false,
"type": "string"
},
{
"name": "namespace",
"in": "query",
"required": false,
"type": "string"
}
],
"tags": [
@ -106,6 +112,12 @@
"in": "query",
"required": false,
"type": "string"
},
{
"name": "namespace",
"in": "query",
"required": false,
"type": "string"
}
],
"tags": [
@ -298,4 +310,4 @@
"Bearer": []
}
]
}
}

View File

@ -21,6 +21,7 @@ import (
"net/http"
"net/url"
"google.golang.org/grpc/metadata"
"github.com/golang/glog"
"github.com/golang/protobuf/jsonpb"
api "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
@ -99,7 +100,11 @@ func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Req
return
}
err = s.canUploadVersionedPipeline(r, pipelineNamespace)
resourceAttributes := &authorizationv1.ResourceAttributes{
Namespace: pipelineNamespace,
Verb: common.RbacResourceVerbCreate,
}
err = s.canUploadVersionedPipeline(r, "", resourceAttributes)
if err != nil {
s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Authorization to namespace failed."))
return
@ -178,7 +183,11 @@ func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *h
return
}
err = s.canUploadVersionedPipeline(r, namespace)
resourceAttributes := &authorizationv1.ResourceAttributes{
Namespace: namespace,
Verb: common.RbacResourceVerbCreate,
}
err = s.canUploadVersionedPipeline(r, pipelineId, resourceAttributes)
if err != nil {
s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Authorization to namespace failed."))
return
@ -221,19 +230,36 @@ func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *h
}
}
func (s *PipelineUploadServer) canUploadVersionedPipeline(r *http.Request, namespace string) error {
if namespace == "" {
func (s *PipelineUploadServer) canUploadVersionedPipeline(r *http.Request, pipelineId string, resourceAttributes *authorizationv1.ResourceAttributes) error {
if !common.IsMultiUserMode() {
// Skip authorization if not multi-user mode.
return nil
}
userIdentityHeader := r.Header.Get(common.GetKubeflowUserIDHeader())
resourceAttributes := &authorizationv1.ResourceAttributes{
Namespace: namespace,
Verb: common.RbacResourceVerbCreate,
Group: common.RbacPipelinesGroup,
Version: common.RbacPipelinesVersion,
Resource: common.RbacResourceTypePipelines,
if len(pipelineId) > 0 {
namespace, err := s.resourceManager.GetNamespaceFromPipelineID(pipelineId)
if err != nil {
return util.Wrap(err, "Failed to authorize with the Pipeline ID.")
}
if len(resourceAttributes.Namespace) == 0 {
resourceAttributes.Namespace = namespace
}
}
err := s.resourceManager.IsRequestAuthorized(context.TODO(), userIdentityHeader, resourceAttributes)
if resourceAttributes.Namespace == "" {
return nil
}
resourceAttributes.Group = common.RbacPipelinesGroup
resourceAttributes.Version = common.RbacPipelinesVersion
resourceAttributes.Resource = common.RbacResourceTypePipelines
ctx := context.Background()
md := metadata.MD{}
for key, values := range r.Header {
md.Set(key, values...)
}
ctx = metadata.NewIncomingContext(ctx, md)
err := isAuthorized(s.resourceManager, ctx, resourceAttributes)
if err != nil {
return util.Wrap(err, "Authorization Failure.")
}