fix(sdk): fix v2 sample tests for cuj3 (#8168)

* fix v2 sample tests for cuj3

* separate sh -c

* fix two step pipeline

* fix piping error

* fix nit

* change path to uri to correct cat command

* correct bash script

* remove display name

* change directory reference in commands

* remove old files
This commit is contained in:
Scott_Xu 2022-08-25 15:30:22 -07:00 committed by GitHub
parent dab5460074
commit 121ddcbc04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 32 additions and 34 deletions

View File

@ -124,6 +124,8 @@
path: samples.v2.pipeline_with_importer_test
- name: pipeline_container_no_input
path: samples.v2.pipeline_container_no_input_test
- name: two_step_pipeline_containerized
path: samples.v2.two_step_pipeline_containerized_test
# TODO(capri-xiyue): Re-enable after figuring out V2 Engine
# and protobuf.Value support.
# - name: cache_v2

View File

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Two step pipeline using dsl.container_component decorator."""
import os
from kfp import compiler
from kfp.dsl import container_component
@ -26,25 +25,24 @@ from kfp.dsl import pipeline
@container_component
def component1(text: str, output_gcs: Output[Dataset]):
return ContainerSpec(
image='google/cloud-sdk:slim',
image='alpine',
command=[
'sh -c | set -e -x', 'echo', text, '| gsutil cp -', output_gcs.uri
])
'sh',
'-c',
'mkdir --parents $(dirname "$1") && echo "$0" > "$1"',
],
args=[text, output_gcs.path])
@container_component
def component2(input_gcs: Input[Dataset]):
return ContainerSpec(
image='google/cloud-sdk:slim',
command=['sh', '-c', '|', 'set -e -x gsutil cat'],
args=[input_gcs.path])
return ContainerSpec(image='alpine', command=['cat'], args=[input_gcs.path])
@pipeline(name='two-step-pipeline-containerized')
def two_step_pipeline_containerized():
component_1 = component1(text='hi').set_display_name('Producer')
component_1 = component1(text='hi')
component_2 = component2(input_gcs=component_1.outputs['output_gcs'])
component_2.set_display_name('Consumer')
if __name__ == '__main__':

View File

@ -19,25 +19,25 @@ from kfp import dsl
@dsl.container_component
def component1(text: str, output_gcs: dsl.Output[dsl.Dataset]):
return dsl.ContainerSpec(
image='google/cloud-sdk:slim',
image='alpine',
command=[
'sh -c | set -e -x', 'echo', text, '| gsutil cp -', output_gcs.uri
])
'sh',
'-c',
'mkdir --parents $(dirname "$1") && echo "$0" > "$1"',
],
args=[text, output_gcs.path])
@dsl.container_component
def component2(input_gcs: dsl.Input[dsl.Dataset]):
return dsl.ContainerSpec(
image='google/cloud-sdk:slim',
command=['sh', '-c', '|', 'set -e -x gsutil cat'],
args=[input_gcs.uri])
image='alpine', command=['cat'], args=[input_gcs.path])
@dsl.pipeline(name='containerized-two-step-pipeline')
def my_pipeline(text: str):
component_1 = component1(text=text).set_display_name('Producer')
component_1 = component1(text=text)
component_2 = component2(input_gcs=component_1.outputs['output_gcs'])
component_2.set_display_name('Consumer')
if __name__ == '__main__':

View File

@ -22,24 +22,22 @@ components:
deploymentSpec:
executors:
exec-component1:
container:
command:
- sh -c | set -e -x
- echo
- '{{$.inputs.parameters[''text'']}}'
- '| gsutil cp -'
- '{{$.outputs.artifacts[''output_gcs''].uri}}'
image: google/cloud-sdk:slim
exec-component2:
container:
args:
- '{{$.inputs.artifacts[''input_gcs''].uri}}'
- '{{$.inputs.parameters[''text'']}}'
- '{{$.outputs.artifacts[''output_gcs''].path}}'
command:
- sh
- -c
- '|'
- set -e -x gsutil cat
image: google/cloud-sdk:slim
- mkdir --parents $(dirname "$1") && echo "$0" > "$1"
image: alpine
exec-component2:
container:
args:
- '{{$.inputs.artifacts[''input_gcs''].path}}'
command:
- cat
image: alpine
pipelineInfo:
name: containerized-two-step-pipeline
root:
@ -55,7 +53,7 @@ root:
text:
componentInputParameter: text
taskInfo:
name: Producer
name: component1
component2:
cachingOptions:
enableCache: true
@ -70,10 +68,10 @@ root:
outputArtifactKey: output_gcs
producerTask: component1
taskInfo:
name: Consumer
name: component2
inputDefinitions:
parameters:
text:
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.1
sdkVersion: kfp-2.0.0-beta.2