fix(sdk): fix output type of importer (#8172)
* change output type of importer to artifact_class.TYPE_NAME * use the same logic as component_factory._annotation_to_type_struct for determining type_name * format using yapf * fix type_name * format using yapf * add yaml file * changes with regards to PR comments * format pipeline_with_importer_and_gcpc_types.py * remove unused imports * change name of test component * add pipeline_with_importer_and_gcpc_types to config * update yaml file
This commit is contained in:
parent
f25127fe76
commit
5ccf53a251
|
@ -16,6 +16,7 @@ CONFIG = {
|
|||
'pipelines': {
|
||||
'test_cases': [
|
||||
'pipeline_with_importer',
|
||||
'pipeline_with_importer_and_gcpc_types',
|
||||
'pipeline_with_ontology',
|
||||
'pipeline_with_if_placeholder',
|
||||
'pipeline_with_concat_placeholder',
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
# Copyright 2022 The Kubeflow Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Pipeline using dsl.importer and GCPC types."""
|
||||
|
||||
from kfp import compiler
|
||||
from kfp import components
|
||||
from kfp import dsl
|
||||
from kfp.dsl import importer
|
||||
|
||||
|
||||
class VertexDataset(dsl.Artifact):
|
||||
"""An artifact representing a GCPC Vertex Dataset."""
|
||||
TYPE_NAME = 'google.VertexDataset'
|
||||
|
||||
|
||||
consumer_op = components.load_component_from_text("""
|
||||
name: consumer_op
|
||||
inputs:
|
||||
- {name: dataset, type: google.VertexDataset}
|
||||
implementation:
|
||||
container:
|
||||
image: dummy
|
||||
command:
|
||||
- cmd
|
||||
args:
|
||||
- {inputPath: dataset}
|
||||
""")
|
||||
|
||||
|
||||
@dsl.pipeline(
|
||||
name='pipeline-with-importer-and-gcpc-type', pipeline_root='dummy_root')
|
||||
def my_pipeline():
|
||||
|
||||
importer1 = importer(
|
||||
artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt',
|
||||
artifact_class=VertexDataset,
|
||||
reimport=False,
|
||||
metadata={'key': 'value'})
|
||||
consume1 = consumer_op(dataset=importer1.output)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
compiler.Compiler().compile(
|
||||
pipeline_func=my_pipeline,
|
||||
package_path=__file__.replace('.py', '.yaml'))
|
|
@ -0,0 +1,74 @@
|
|||
components:
|
||||
comp-consumer-op:
|
||||
executorLabel: exec-consumer-op
|
||||
inputDefinitions:
|
||||
artifacts:
|
||||
dataset:
|
||||
artifactType:
|
||||
schemaTitle: google.VertexDataset
|
||||
schemaVersion: 0.0.1
|
||||
comp-importer:
|
||||
executorLabel: exec-importer
|
||||
inputDefinitions:
|
||||
parameters:
|
||||
uri:
|
||||
parameterType: STRING
|
||||
outputDefinitions:
|
||||
artifacts:
|
||||
artifact:
|
||||
artifactType:
|
||||
schemaTitle: google.VertexDataset
|
||||
schemaVersion: 0.0.1
|
||||
defaultPipelineRoot: dummy_root
|
||||
deploymentSpec:
|
||||
executors:
|
||||
exec-consumer-op:
|
||||
container:
|
||||
args:
|
||||
- '{{$.inputs.artifacts[''dataset''].path}}'
|
||||
command:
|
||||
- cmd
|
||||
image: dummy
|
||||
exec-importer:
|
||||
importer:
|
||||
artifactUri:
|
||||
constant: gs://ml-pipeline-playground/shakespeare1.txt
|
||||
metadata:
|
||||
key: value
|
||||
typeSchema:
|
||||
schemaTitle: google.VertexDataset
|
||||
schemaVersion: 0.0.1
|
||||
pipelineInfo:
|
||||
name: pipeline-with-importer-and-gcpc-type
|
||||
root:
|
||||
dag:
|
||||
tasks:
|
||||
consumer-op:
|
||||
cachingOptions:
|
||||
enableCache: true
|
||||
componentRef:
|
||||
name: comp-consumer-op
|
||||
dependentTasks:
|
||||
- importer
|
||||
inputs:
|
||||
artifacts:
|
||||
dataset:
|
||||
taskOutputArtifact:
|
||||
outputArtifactKey: artifact
|
||||
producerTask: importer
|
||||
taskInfo:
|
||||
name: consumer-op
|
||||
importer:
|
||||
cachingOptions:
|
||||
enableCache: true
|
||||
componentRef:
|
||||
name: comp-importer
|
||||
inputs:
|
||||
parameters:
|
||||
uri:
|
||||
runtimeValue:
|
||||
constant: gs://ml-pipeline-playground/shakespeare1.txt
|
||||
taskInfo:
|
||||
name: importer
|
||||
schemaVersion: 2.1.0
|
||||
sdkVersion: kfp-2.0.0-beta.3
|
|
@ -54,6 +54,14 @@ def importer(
|
|||
reimport=False)
|
||||
train(dataset=importer1.output)
|
||||
"""
|
||||
if issubclass(artifact_class, artifact_types.Artifact
|
||||
) and not artifact_class.TYPE_NAME.startswith('system.'):
|
||||
# For artifact classes not under the `system` namespace,
|
||||
# use its TYPE_NAME as-is.
|
||||
type_name = artifact_class.TYPE_NAME
|
||||
else:
|
||||
type_name = artifact_class.__name__
|
||||
|
||||
component_spec = structures.ComponentSpec(
|
||||
name='importer',
|
||||
implementation=structures.Implementation(
|
||||
|
@ -64,9 +72,7 @@ def importer(
|
|||
reimport=reimport,
|
||||
metadata=metadata)),
|
||||
inputs={INPUT_KEY: structures.InputSpec(type='String')},
|
||||
outputs={
|
||||
OUTPUT_KEY: structures.OutputSpec(type=artifact_class.__name__)
|
||||
},
|
||||
outputs={OUTPUT_KEY: structures.OutputSpec(type=type_name)},
|
||||
)
|
||||
|
||||
importer = importer_component.ImporterComponent(
|
||||
|
|
Loading…
Reference in New Issue