# Copyright 2021 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tests for kfp.dsl.component_spec.""" from absl.testing import parameterized from kfp.deprecated.components import _structures as structures from kfp.deprecated.dsl import _pipeline_param from kfp.deprecated.dsl import component_spec as dsl_component_spec from kfp.pipeline_spec import pipeline_spec_pb2 from google.protobuf import json_format class ComponentSpecTest(parameterized.TestCase): TEST_PIPELINE_PARAMS = [ _pipeline_param.PipelineParam( name='output1', param_type='Dataset', op_name='op-1'), _pipeline_param.PipelineParam( name='output2', param_type='Integer', op_name='op-2'), _pipeline_param.PipelineParam( name='output3', param_type='Model', op_name='op-3'), _pipeline_param.PipelineParam( name='output4', param_type='Double', op_name='op-4'), _pipeline_param.PipelineParam( name='arg_input', param_type='String', op_name=None), ] def setUp(self): self.maxDiff = None def test_build_component_spec_from_structure(self): structure_component_spec = structures.ComponentSpec( name='component1', description='component1 desc', inputs=[ structures.InputSpec( name='input1', description='input1 desc', type='Dataset'), structures.InputSpec( name='input2', description='input2 desc', type='String'), structures.InputSpec( name='input3', description='input3 desc', type='Integer'), structures.InputSpec( name='input4', description='optional inputs', optional=True), ], outputs=[ structures.OutputSpec( name='output1', description='output1 desc', type='Model') ]) expected_dict = { 'inputDefinitions': { 'artifacts': { 'input1': { 'artifactType': { 'schemaTitle': 'system.Dataset', 'schemaVersion': '0.0.1' } } }, 'parameters': { 'input2': { 'parameterType': 'STRING' }, 'input3': { 'parameterType': 'NUMBER_INTEGER' } } }, 'outputDefinitions': { 'artifacts': { 'output1': { 'artifactType': { 'schemaTitle': 'system.Model', 'schemaVersion': '0.0.1' } } } }, 'executorLabel': 'exec-component1' } expected_spec = pipeline_spec_pb2.ComponentSpec() json_format.ParseDict(expected_dict, expected_spec) component_spec = ( dsl_component_spec.build_component_spec_from_structure( component_spec=structure_component_spec, executor_label='exec-component1', actual_inputs=['input1', 'input2', 'input3'], )) self.assertEqual(expected_spec, component_spec) @parameterized.parameters( { 'is_root_component': True, 'expected_result': { 'inputDefinitions': { 'artifacts': { 'input1': { 'artifactType': { 'schemaTitle': 'system.Dataset', 'schemaVersion': '0.0.1' } } }, 'parameters': { 'input2': { 'parameterType': 'NUMBER_INTEGER' }, 'input3': { 'parameterType': 'STRING' }, 'input4': { 'parameterType': 'NUMBER_DOUBLE' } } } } }, { 'is_root_component': False, 'expected_result': { 'inputDefinitions': { 'artifacts': { 'pipelineparam--input1': { 'artifactType': { 'schemaTitle': 'system.Dataset', 'schemaVersion': '0.0.1' } } }, 'parameters': { 'pipelineparam--input2': { 'parameterType': 'NUMBER_INTEGER' }, 'pipelineparam--input3': { 'parameterType': 'STRING' }, 'pipelineparam--input4': { 'parameterType': 'NUMBER_DOUBLE' } } } } }, ) def test_build_component_inputs_spec(self, is_root_component, expected_result): pipeline_params = [ _pipeline_param.PipelineParam(name='input1', param_type='Dataset'), _pipeline_param.PipelineParam(name='input2', param_type='Integer'), _pipeline_param.PipelineParam(name='input3', param_type='String'), _pipeline_param.PipelineParam(name='input4', param_type='Float'), ] expected_spec = pipeline_spec_pb2.ComponentSpec() json_format.ParseDict(expected_result, expected_spec) component_spec = pipeline_spec_pb2.ComponentSpec() dsl_component_spec.build_component_inputs_spec(component_spec, pipeline_params, is_root_component) self.assertEqual(expected_spec, component_spec) def test_build_component_outputs_spec(self): pipeline_params = [ _pipeline_param.PipelineParam(name='output1', param_type='Dataset'), _pipeline_param.PipelineParam(name='output2', param_type='Integer'), _pipeline_param.PipelineParam(name='output3', param_type='String'), _pipeline_param.PipelineParam(name='output4', param_type='Float'), ] expected_dict = { 'outputDefinitions': { 'artifacts': { 'output1': { 'artifactType': { 'schemaTitle': 'system.Dataset', 'schemaVersion': '0.0.1' } } }, 'parameters': { 'output2': { 'parameterType': 'NUMBER_INTEGER' }, 'output3': { 'parameterType': 'STRING' }, 'output4': { 'parameterType': 'NUMBER_DOUBLE' } } } } expected_spec = pipeline_spec_pb2.ComponentSpec() json_format.ParseDict(expected_dict, expected_spec) component_spec = pipeline_spec_pb2.ComponentSpec() dsl_component_spec.build_component_outputs_spec(component_spec, pipeline_params) self.assertEqual(expected_spec, component_spec) @parameterized.parameters( { 'is_parent_component_root': True, 'expected_result': { 'inputs': { 'artifacts': { 'pipelineparam--op-1-output1': { 'taskOutputArtifact': { 'producerTask': 'op-1', 'outputArtifactKey': 'output1' } }, 'pipelineparam--op-3-output3': { 'componentInputArtifact': 'op-3-output3' } }, 'parameters': { 'pipelineparam--op-2-output2': { 'taskOutputParameter': { 'producerTask': 'op-2', 'outputParameterKey': 'output2' } }, 'pipelineparam--op-4-output4': { 'componentInputParameter': 'op-4-output4' }, 'pipelineparam--arg_input': { 'componentInputParameter': 'arg_input' } } } } }, { 'is_parent_component_root': False, 'expected_result': { 'inputs': { 'artifacts': { 'pipelineparam--op-1-output1': { 'taskOutputArtifact': { 'producerTask': 'op-1', 'outputArtifactKey': 'output1' } }, 'pipelineparam--op-3-output3': { 'componentInputArtifact': 'pipelineparam--op-3-output3' } }, 'parameters': { 'pipelineparam--op-2-output2': { 'taskOutputParameter': { 'producerTask': 'op-2', 'outputParameterKey': 'output2' } }, 'pipelineparam--op-4-output4': { 'componentInputParameter': 'pipelineparam--op-4-output4' }, 'pipelineparam--arg_input': { 'componentInputParameter': 'pipelineparam--arg_input' } } } } }, ) def test_build_task_inputs_spec(self, is_parent_component_root, expected_result): pipeline_params = self.TEST_PIPELINE_PARAMS tasks_in_current_dag = ['op-1', 'op-2'] expected_spec = pipeline_spec_pb2.PipelineTaskSpec() json_format.ParseDict(expected_result, expected_spec) task_spec = pipeline_spec_pb2.PipelineTaskSpec() dsl_component_spec.build_task_inputs_spec(task_spec, pipeline_params, tasks_in_current_dag, is_parent_component_root) self.assertEqual(expected_spec, task_spec) @parameterized.parameters( { 'original_task_spec': {}, 'parent_component_inputs': {}, 'tasks_in_current_dag': [], 'input_parameters_in_current_dag': [], 'input_artifacts_in_current_dag': [], 'expected_result': {}, }, { # Depending on tasks & inputs within the current DAG. 'original_task_spec': { 'inputs': { 'artifacts': { 'pipelineparam--op-1-output1': { 'taskOutputArtifact': { 'producerTask': 'op-1', 'outputArtifactKey': 'output1' } }, 'artifact1': { 'componentInputArtifact': 'artifact1' }, }, 'parameters': { 'pipelineparam--op-2-output2': { 'taskOutputParameter': { 'producerTask': 'op-2', 'outputParameterKey': 'output2' } }, 'param1': { 'componentInputParameter': 'param1' }, } } }, 'parent_component_inputs': { 'artifacts': { 'artifact1': { 'artifactType': { 'instanceSchema': 'dummy_schema' } }, }, 'parameters': { 'param1': { 'parameterType': 'STRING' }, } }, 'tasks_in_current_dag': ['op-1', 'op-2'], 'input_parameters_in_current_dag': ['param1'], 'input_artifacts_in_current_dag': ['artifact1'], 'expected_result': { 'inputs': { 'artifacts': { 'pipelineparam--op-1-output1': { 'taskOutputArtifact': { 'producerTask': 'op-1', 'outputArtifactKey': 'output1' } }, 'artifact1': { 'componentInputArtifact': 'artifact1' }, }, 'parameters': { 'pipelineparam--op-2-output2': { 'taskOutputParameter': { 'producerTask': 'op-2', 'outputParameterKey': 'output2' } }, 'param1': { 'componentInputParameter': 'param1' }, } } }, }, { # Depending on tasks and inputs not available in the current DAG. 'original_task_spec': { 'inputs': { 'artifacts': { 'pipelineparam--op-1-output1': { 'taskOutputArtifact': { 'producerTask': 'op-1', 'outputArtifactKey': 'output1' } }, 'artifact1': { 'componentInputArtifact': 'artifact1' }, }, 'parameters': { 'pipelineparam--op-2-output2': { 'taskOutputParameter': { 'producerTask': 'op-2', 'outputParameterKey': 'output2' } }, 'param1': { 'componentInputParameter': 'param1' }, } } }, 'parent_component_inputs': { 'artifacts': { 'pipelineparam--op-1-output1': { 'artifactType': { 'instanceSchema': 'dummy_schema' } }, 'pipelineparam--artifact1': { 'artifactType': { 'instanceSchema': 'dummy_schema' } }, }, 'parameters': { 'pipelineparam--op-2-output2' : { 'parameterType': 'NUMBER_INTEGER' }, 'pipelineparam--param1': { 'parameterType': 'STRING' }, } }, 'tasks_in_current_dag': ['op-3'], 'input_parameters_in_current_dag': ['pipelineparam--op-2-output2', 'pipelineparam--param1'], 'input_artifacts_in_current_dag': ['pipelineparam--op-1-output1', 'pipelineparam--artifact1'], 'expected_result': { 'inputs': { 'artifacts': { 'pipelineparam--op-1-output1': { 'componentInputArtifact': 'pipelineparam--op-1-output1' }, 'artifact1': { 'componentInputArtifact': 'pipelineparam--artifact1' }, }, 'parameters': { 'pipelineparam--op-2-output2': { 'componentInputParameter': 'pipelineparam--op-2-output2' }, 'param1': { 'componentInputParameter': 'pipelineparam--param1' }, } } }, }, ) def test_update_task_inputs_spec(self, original_task_spec, parent_component_inputs, tasks_in_current_dag, input_parameters_in_current_dag, input_artifacts_in_current_dag, expected_result): pipeline_params = self.TEST_PIPELINE_PARAMS expected_spec = pipeline_spec_pb2.PipelineTaskSpec() json_format.ParseDict(expected_result, expected_spec) task_spec = pipeline_spec_pb2.PipelineTaskSpec() json_format.ParseDict(original_task_spec, task_spec) parent_component_inputs_spec = pipeline_spec_pb2.ComponentInputsSpec() json_format.ParseDict(parent_component_inputs, parent_component_inputs_spec) dsl_component_spec.update_task_inputs_spec( task_spec, parent_component_inputs_spec, pipeline_params, tasks_in_current_dag, input_parameters_in_current_dag, input_artifacts_in_current_dag) self.assertEqual(expected_spec, task_spec) def test_pop_input_from_component_spec(self): component_spec = pipeline_spec_pb2.ComponentSpec( executor_label='exec-component1') component_spec.input_definitions.artifacts[ 'input1'].artifact_type.schema_title = 'system.Dataset' component_spec.input_definitions.parameters[ 'input2'].parameter_type = pipeline_spec_pb2.ParameterType.STRING component_spec.input_definitions.parameters[ 'input3'].parameter_type = pipeline_spec_pb2.ParameterType.NUMBER_DOUBLE # pop an artifact, and there're other inputs left dsl_component_spec.pop_input_from_component_spec( component_spec, 'input1') expected_dict = { 'inputDefinitions': { 'parameters': { 'input2': { 'parameterType': 'STRING' }, 'input3': { 'parameterType': 'NUMBER_DOUBLE' } } }, 'executorLabel': 'exec-component1' } expected_spec = pipeline_spec_pb2.ComponentSpec() json_format.ParseDict(expected_dict, expected_spec) self.assertEqual(expected_spec, component_spec) # pop an parameter, and there're other inputs left dsl_component_spec.pop_input_from_component_spec( component_spec, 'input2') expected_dict = { 'inputDefinitions': { 'parameters': { 'input3': { 'parameterType': 'NUMBER_DOUBLE' } } }, 'executorLabel': 'exec-component1' } expected_spec = pipeline_spec_pb2.ComponentSpec() json_format.ParseDict(expected_dict, expected_spec) self.assertEqual(expected_spec, component_spec) # pop the last input, expect no inputDefinitions dsl_component_spec.pop_input_from_component_spec( component_spec, 'input3') expected_dict = {'executorLabel': 'exec-component1'} expected_spec = pipeline_spec_pb2.ComponentSpec() json_format.ParseDict(expected_dict, expected_spec) self.assertEqual(expected_spec, component_spec) # pop an input that doesn't exist, expect no-op. dsl_component_spec.pop_input_from_component_spec( component_spec, 'input4') self.assertEqual(expected_spec, component_spec) def test_pop_input_from_task_spec(self): task_spec = pipeline_spec_pb2.PipelineTaskSpec() task_spec.component_ref.name = 'comp-component1' task_spec.inputs.artifacts[ 'input1'].task_output_artifact.producer_task = 'op-1' task_spec.inputs.artifacts[ 'input1'].task_output_artifact.output_artifact_key = 'output1' task_spec.inputs.parameters[ 'input2'].task_output_parameter.producer_task = 'op-2' task_spec.inputs.parameters[ 'input2'].task_output_parameter.output_parameter_key = 'output2' task_spec.inputs.parameters[ 'input3'].component_input_parameter = 'op3-output3' # pop an parameter, and there're other inputs left dsl_component_spec.pop_input_from_task_spec(task_spec, 'input3') expected_dict = { 'inputs': { 'artifacts': { 'input1': { 'taskOutputArtifact': { 'producerTask': 'op-1', 'outputArtifactKey': 'output1' } } }, 'parameters': { 'input2': { 'taskOutputParameter': { 'producerTask': 'op-2', 'outputParameterKey': 'output2' } } } }, 'component_ref': { 'name': 'comp-component1' } } expected_spec = pipeline_spec_pb2.PipelineTaskSpec() json_format.ParseDict(expected_dict, expected_spec) self.assertEqual(expected_spec, task_spec) # pop an artifact, and there're other inputs left dsl_component_spec.pop_input_from_task_spec(task_spec, 'input1') expected_dict = { 'inputs': { 'parameters': { 'input2': { 'taskOutputParameter': { 'producerTask': 'op-2', 'outputParameterKey': 'output2' } } } }, 'component_ref': { 'name': 'comp-component1' } } expected_spec = pipeline_spec_pb2.PipelineTaskSpec() json_format.ParseDict(expected_dict, expected_spec) self.assertEqual(expected_spec, task_spec) # pop the last input, expect no inputDefinitions dsl_component_spec.pop_input_from_task_spec(task_spec, 'input2') expected_dict = {'component_ref': {'name': 'comp-component1'}} expected_spec = pipeline_spec_pb2.PipelineTaskSpec() json_format.ParseDict(expected_dict, expected_spec) self.assertEqual(expected_spec, task_spec) # pop an input that doesn't exist, expect no-op. dsl_component_spec.pop_input_from_task_spec(task_spec, 'input4') self.assertEqual(expected_spec, task_spec) def test_additional_input_name_for_pipelineparam(self): self.assertEqual( 'pipelineparam--op1-param1', dsl_component_spec.additional_input_name_for_pipelineparam( _pipeline_param.PipelineParam(name='param1', op_name='op1'))) self.assertEqual( 'pipelineparam--param2', dsl_component_spec.additional_input_name_for_pipelineparam( _pipeline_param.PipelineParam(name='param2'))) self.assertEqual( 'pipelineparam--param3', dsl_component_spec.additional_input_name_for_pipelineparam( 'param3')) if __name__ == '__main__': unittest.main()