name: Run component or pipeline metadata: annotations: author: Alexey Volkov canonical_location: 'https://raw.githubusercontent.com/Ark-kun/pipeline_components/master/components/kfp/Run_component/component.yaml' inputs: - {name: component_url, type: Url} - {name: arguments, type: JsonObject} - {name: endpoint, type: String, optional: true} - {name: wait_timeout_seconds, type: Float, optional: true} outputs: - {name: run_id, type: String} - {name: run_object, type: JsonObject} implementation: container: image: python:3.9 command: - sh - -c - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.4.0' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.4.0' --user) && "$0" "$@" - sh - -ec - | program_path=$(mktemp) printf "%s" "$0" > "$program_path" python3 -u "$program_path" "$@" - | def run_component_or_pipeline( component_url, arguments, endpoint = None, wait_timeout_seconds = None, ): import json import os import kfp from kfp_server_api import ApiClient print('Loading component...') op = kfp.components.load_component_from_url(component_url) print('Loading component done.') print('Submitting run...') if not endpoint: endpoint = 'http://' + os.environ['ML_PIPELINE_SERVICE_HOST'] + ':' + os.environ['ML_PIPELINE_SERVICE_PORT'] create_run_result = kfp.Client(host=endpoint).create_run_from_pipeline_func(op, arguments=arguments) run_id = str(create_run_result.run_id) print('Submitted run: ' + run_id) run_url = f'{endpoint.rstrip("/")}/#/runs/details/{run_id}' print(run_url) print('Waiting for the run to finish...') run_object = create_run_result.wait_for_run_completion(wait_timeout_seconds) print('Run has finished.') # sanitize_for_serialization uses correct field names and properly converts datetime values run_dict = ApiClient().sanitize_for_serialization(run_object) return ( run_id, json.dumps(run_dict, indent=4), ) def _serialize_json(obj) -> str: if isinstance(obj, str): return obj import json def default_serializer(obj): if hasattr(obj, 'to_struct'): return obj.to_struct() else: raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__) return json.dumps(obj, default=default_serializer, sort_keys=True) def _serialize_str(str_value: str) -> str: if not isinstance(str_value, str): raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value)))) return str_value import json import argparse _parser = argparse.ArgumentParser(prog='Run component or pipeline', description='') _parser.add_argument("--component-url", dest="component_url", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--arguments", dest="arguments", type=json.loads, required=True, default=argparse.SUPPRESS) _parser.add_argument("--endpoint", dest="endpoint", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--wait-timeout-seconds", dest="wait_timeout_seconds", type=float, required=False, default=argparse.SUPPRESS) _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) _parsed_args = vars(_parser.parse_args()) _output_files = _parsed_args.pop("_output_paths", []) _outputs = run_component_or_pipeline(**_parsed_args) _output_serializers = [ _serialize_str, _serialize_json, ] import os for idx, output_file in enumerate(_output_files): try: os.makedirs(os.path.dirname(output_file)) except OSError: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) args: - --component-url - {inputValue: component_url} - --arguments - {inputValue: arguments} - if: cond: {isPresent: endpoint} then: - --endpoint - {inputValue: endpoint} - if: cond: {isPresent: wait_timeout_seconds} then: - --wait-timeout-seconds - {inputValue: wait_timeout_seconds} - '----output-paths' - {outputPath: run_id} - {outputPath: run_object}